001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import com.google.protobuf.RpcCallback;
025import com.google.protobuf.RpcController;
026import com.google.protobuf.Service;
027import com.google.protobuf.ServiceException;
028import java.io.IOException;
029import java.util.Collections;
030import java.util.Map;
031import org.apache.hadoop.hbase.CoprocessorEnvironment;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.RegionLocator;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.coprocessor.Batch;
040import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos;
045import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest;
046import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse;
047import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest;
048import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse;
049import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest;
050import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse;
051import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest;
052import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse;
053import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest;
054import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@Category({RegionServerTests.class, MediumTests.class})
070public class TestServerCustomProtocol {
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestServerCustomProtocol.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestServerCustomProtocol.class);
076  static final String WHOAREYOU = "Who are you?";
077  static final String NOBODY = "nobody";
078  static final String HELLO = "Hello, ";
079
080  /* Test protocol implementation */
081  public static class PingHandler extends PingProtos.PingService implements RegionCoprocessor {
082    private int counter = 0;
083
084    @Override
085    public void start(CoprocessorEnvironment env) throws IOException {
086      if (env instanceof RegionCoprocessorEnvironment) {
087        return;
088      }
089      throw new CoprocessorException("Must be loaded on a table region!");
090    }
091
092    @Override
093    public void stop(CoprocessorEnvironment env) throws IOException {
094      // Nothing to do.
095    }
096
097    @Override
098    public void ping(RpcController controller, PingRequest request,
099        RpcCallback<PingResponse> done) {
100      this.counter++;
101      done.run(PingResponse.newBuilder().setPong("pong").build());
102    }
103
104    @Override
105    public void count(RpcController controller, CountRequest request,
106        RpcCallback<CountResponse> done) {
107      done.run(CountResponse.newBuilder().setCount(this.counter).build());
108    }
109
110    @Override
111    public void increment(RpcController controller,
112        IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) {
113      this.counter += request.getDiff();
114      done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build());
115    }
116
117    @Override
118    public void hello(RpcController controller, HelloRequest request,
119        RpcCallback<HelloResponse> done) {
120      if (!request.hasName()) {
121        done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
122      } else if (request.getName().equals(NOBODY)) {
123        done.run(HelloResponse.newBuilder().build());
124      } else {
125        done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
126      }
127    }
128
129    @Override
130    public void noop(RpcController controller, NoopRequest request,
131        RpcCallback<NoopResponse> done) {
132      done.run(NoopResponse.newBuilder().build());
133    }
134
135    @Override
136    public Iterable<Service> getServices() {
137      return Collections.singleton(this);
138    }
139  }
140
141  private static final TableName TEST_TABLE = TableName.valueOf("test");
142  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
143
144  private static final byte[] ROW_A = Bytes.toBytes("aaa");
145  private static final byte[] ROW_B = Bytes.toBytes("bbb");
146  private static final byte[] ROW_C = Bytes.toBytes("ccc");
147
148  private static final byte[] ROW_AB = Bytes.toBytes("abb");
149  private static final byte[] ROW_BC = Bytes.toBytes("bcc");
150
151  private static HBaseTestingUtility util = new HBaseTestingUtility();
152
153  @BeforeClass
154  public static void setupBeforeClass() throws Exception {
155    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
156      PingHandler.class.getName());
157    util.startMiniCluster();
158  }
159
160  @Before
161  public void before() throws Exception {
162    final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C };
163    Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS);
164
165    Put puta = new Put(ROW_A);
166    puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
167    table.put(puta);
168
169    Put putb = new Put(ROW_B);
170    putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
171    table.put(putb);
172
173    Put putc = new Put(ROW_C);
174    putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
175    table.put(putc);
176  }
177
178  @After
179  public void after() throws Exception {
180    util.deleteTable(TEST_TABLE);
181  }
182
183  @AfterClass
184  public static void tearDownAfterClass() throws Exception {
185    util.shutdownMiniCluster();
186  }
187
188  @Test
189  public void testSingleProxy() throws Throwable {
190    Table table = util.getConnection().getTable(TEST_TABLE);
191    Map<byte [], String> results = ping(table, null, null);
192    // There are three regions so should get back three results.
193    assertEquals(3, results.size());
194    for (Map.Entry<byte [], String> e: results.entrySet()) {
195      assertEquals("Invalid custom protocol response", "pong", e.getValue());
196    }
197    hello(table, "George", HELLO + "George");
198    LOG.info("Did george");
199    hello(table, null, "Who are you?");
200    LOG.info("Who are you");
201    hello(table, NOBODY, null);
202    LOG.info(NOBODY);
203    Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class,
204      null, null,
205      new Batch.Call<PingProtos.PingService, Integer>() {
206        @Override
207        public Integer call(PingProtos.PingService instance) throws IOException {
208          CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.CountResponse> rpcCallback =
209            new CoprocessorRpcUtils.BlockingRpcCallback<>();
210          instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback);
211          return rpcCallback.get().getCount();
212        }
213      });
214    int count = -1;
215    for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
216      assertTrue(e.getValue() > 0);
217      count = e.getValue();
218    }
219    final int diff = 5;
220    intResults = table.coprocessorService(PingProtos.PingService.class,
221      null, null,
222      new Batch.Call<PingProtos.PingService, Integer>() {
223        @Override
224        public Integer call(PingProtos.PingService instance) throws IOException {
225          CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback =
226            new CoprocessorRpcUtils.BlockingRpcCallback<>();
227          instance.increment(null,
228              PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(),
229            rpcCallback);
230          return rpcCallback.get().getCount();
231        }
232      });
233    // There are three regions so should get back three results.
234    assertEquals(3, results.size());
235    for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
236      assertEquals(e.getValue().intValue(), count + diff);
237    }
238    table.close();
239  }
240
241  private Map<byte [], String> hello(final Table table, final String send, final String response)
242          throws ServiceException, Throwable {
243    Map<byte [], String> results = hello(table, send);
244    for (Map.Entry<byte [], String> e: results.entrySet()) {
245      assertEquals("Invalid custom protocol response", response, e.getValue());
246    }
247    return results;
248  }
249
250  private Map<byte [], String> hello(final Table table, final String send)
251          throws ServiceException, Throwable {
252    return hello(table, send, null, null);
253  }
254
255  private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
256          final byte [] end) throws ServiceException, Throwable {
257    return table.coprocessorService(PingProtos.PingService.class,
258        start, end,
259        new Batch.Call<PingProtos.PingService, String>() {
260          @Override
261          public String call(PingProtos.PingService instance) throws IOException {
262            CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
263              new CoprocessorRpcUtils.BlockingRpcCallback<>();
264            PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
265            if (send != null) {
266              builder.setName(send);
267            }
268            instance.hello(null, builder.build(), rpcCallback);
269            PingProtos.HelloResponse r = rpcCallback.get();
270            return r != null && r.hasResponse()? r.getResponse(): null;
271          }
272        });
273  }
274
275  private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
276          final byte [] end) throws ServiceException, Throwable {
277    return table.coprocessorService(PingProtos.PingService.class,
278        start, end,
279        new Batch.Call<PingProtos.PingService, String>() {
280          @Override
281          public String call(PingProtos.PingService instance) throws IOException {
282            CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
283              new CoprocessorRpcUtils.BlockingRpcCallback<>();
284            PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
285            // Call ping on same instance.  Use result calling hello on same instance.
286            builder.setName(doPing(instance));
287            instance.hello(null, builder.build(), rpcCallback);
288            PingProtos.HelloResponse r = rpcCallback.get();
289            return r != null && r.hasResponse()? r.getResponse(): null;
290          }
291        });
292  }
293
294  private Map<byte [], String> noop(final Table table, final byte [] start, final byte [] end)
295          throws ServiceException, Throwable {
296    return table.coprocessorService(PingProtos.PingService.class, start, end,
297        new Batch.Call<PingProtos.PingService, String>() {
298          @Override
299          public String call(PingProtos.PingService instance) throws IOException {
300            CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback =
301              new CoprocessorRpcUtils.BlockingRpcCallback<>();
302            PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder();
303            instance.noop(null, builder.build(), rpcCallback);
304            rpcCallback.get();
305            // Looks like null is expected when void.  That is what the test below is looking for
306            return null;
307          }
308        });
309  }
310
311  @Test
312  public void testSingleMethod() throws Throwable {
313    try (Table table = util.getConnection().getTable(TEST_TABLE);
314        RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
315      Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
316        null, ROW_A,
317        new Batch.Call<PingProtos.PingService, String>() {
318          @Override
319          public String call(PingProtos.PingService instance) throws IOException {
320            CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
321              new CoprocessorRpcUtils.BlockingRpcCallback<>();
322            instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
323            return rpcCallback.get().getPong();
324          }
325        });
326      // Should have gotten results for 1 of the three regions only since we specified
327      // rows from 1 region
328      assertEquals(1, results.size());
329      verifyRegionResults(locator, results, ROW_A);
330
331      final String name = "NAME";
332      results = hello(table, name, null, ROW_A);
333      // Should have gotten results for 1 of the three regions only since we specified
334      // rows from 1 region
335      assertEquals(1, results.size());
336      verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
337    }
338  }
339
340  @Test
341  public void testRowRange() throws Throwable {
342    try (Table table = util.getConnection().getTable(TEST_TABLE);
343        RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
344      for (HRegionLocation e: locator.getAllRegionLocations()) {
345        LOG.info("Region " + e.getRegionInfo().getRegionNameAsString()
346            + ", servername=" + e.getServerName());
347      }
348      // Here are what regions looked like on a run:
349      //
350      // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d.
351      // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e.
352      // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74.
353
354      Map<byte [], String> results = ping(table, null, ROW_A);
355      // Should contain first region only.
356      assertEquals(1, results.size());
357      verifyRegionResults(locator, results, ROW_A);
358
359      // Test start row + empty end
360      results = ping(table, ROW_BC, null);
361      assertEquals(2, results.size());
362      // should contain last 2 regions
363      HRegionLocation loc = locator.getRegionLocation(ROW_A, true);
364      assertNull("Should be missing region for row aaa (prior to start row)",
365        results.get(loc.getRegionInfo().getRegionName()));
366      verifyRegionResults(locator, results, ROW_B);
367      verifyRegionResults(locator, results, ROW_C);
368
369      // test empty start + end
370      results = ping(table, null, ROW_BC);
371      // should contain the first 2 regions
372      assertEquals(2, results.size());
373      verifyRegionResults(locator, results, ROW_A);
374      verifyRegionResults(locator, results, ROW_B);
375      loc = locator.getRegionLocation(ROW_C, true);
376      assertNull("Should be missing region for row ccc (past stop row)",
377          results.get(loc.getRegionInfo().getRegionName()));
378
379      // test explicit start + end
380      results = ping(table, ROW_AB, ROW_BC);
381      // should contain first 2 regions
382      assertEquals(2, results.size());
383      verifyRegionResults(locator, results, ROW_A);
384      verifyRegionResults(locator, results, ROW_B);
385      loc = locator.getRegionLocation(ROW_C, true);
386      assertNull("Should be missing region for row ccc (past stop row)",
387          results.get(loc.getRegionInfo().getRegionName()));
388
389      // test single region
390      results = ping(table, ROW_B, ROW_BC);
391      // should only contain region bbb
392      assertEquals(1, results.size());
393      verifyRegionResults(locator, results, ROW_B);
394      loc = locator.getRegionLocation(ROW_A, true);
395      assertNull("Should be missing region for row aaa (prior to start)",
396          results.get(loc.getRegionInfo().getRegionName()));
397      loc = locator.getRegionLocation(ROW_C, true);
398      assertNull("Should be missing region for row ccc (past stop row)",
399          results.get(loc.getRegionInfo().getRegionName()));
400    }
401  }
402
403  private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
404          throws ServiceException, Throwable {
405    return table.coprocessorService(PingProtos.PingService.class, start, end,
406      new Batch.Call<PingProtos.PingService, String>() {
407        @Override
408        public String call(PingProtos.PingService instance) throws IOException {
409          return doPing(instance);
410        }
411      });
412  }
413
414  private static String doPing(PingProtos.PingService instance) throws IOException {
415    CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
416        new CoprocessorRpcUtils.BlockingRpcCallback<>();
417    instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
418    return rpcCallback.get().getPong();
419  }
420
421  @Test
422  public void testCompoundCall() throws Throwable {
423    try (Table table = util.getConnection().getTable(TEST_TABLE);
424        RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
425      Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
426      verifyRegionResults(locator, results, "Hello, pong", ROW_A);
427      verifyRegionResults(locator, results, "Hello, pong", ROW_B);
428      verifyRegionResults(locator, results, "Hello, pong", ROW_C);
429    }
430  }
431
432  @Test
433  public void testNullCall() throws Throwable {
434    try (Table table = util.getConnection().getTable(TEST_TABLE);
435        RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
436      Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
437      verifyRegionResults(locator, results, "Who are you?", ROW_A);
438      verifyRegionResults(locator, results, "Who are you?", ROW_B);
439      verifyRegionResults(locator, results, "Who are you?", ROW_C);
440    }
441  }
442
443  @Test
444  public void testNullReturn() throws Throwable {
445    try (Table table = util.getConnection().getTable(TEST_TABLE);
446        RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
447      Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
448      verifyRegionResults(locator, results, null, ROW_A);
449      verifyRegionResults(locator, results, null, ROW_B);
450      verifyRegionResults(locator, results, null, ROW_C);
451    }
452  }
453
454  @Test
455  public void testEmptyReturnType() throws Throwable {
456    try (Table table = util.getConnection().getTable(TEST_TABLE)) {
457      Map<byte[],String> results = noop(table, ROW_A, ROW_C);
458      assertEquals("Should have results from three regions", 3, results.size());
459      // all results should be null
460      for (Object v : results.values()) {
461        assertNull(v);
462      }
463    }
464  }
465
466  private void verifyRegionResults(RegionLocator table, Map<byte[],String> results, byte[] row)
467          throws Exception {
468    verifyRegionResults(table, results, "pong", row);
469  }
470
471  private void verifyRegionResults(RegionLocator regionLocator, Map<byte[], String> results,
472          String expected, byte[] row) throws Exception {
473    for (Map.Entry<byte [], String> e: results.entrySet()) {
474      LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
475        ", result key=" + Bytes.toString(e.getKey()) +
476        ", value=" + e.getValue());
477    }
478    HRegionLocation loc = regionLocator.getRegionLocation(row, true);
479    byte[] region = loc.getRegionInfo().getRegionName();
480    assertTrue("Results should contain region " +
481      Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",
482      results.containsKey(region));
483    assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'",
484      expected, results.get(region));
485  }
486}