001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
021import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
022import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
023
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.util.ArrayList;
027import java.util.List;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
036import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
037import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
038import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
039import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
040import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
041import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
042import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.util.Threads;
046
047@InterfaceAudience.Private
048public class TestProtobufRpcServiceImpl implements BlockingInterface {
049
050  public static final BlockingService SERVICE = TestProtobufRpcProto
051      .newReflectiveBlockingService(new TestProtobufRpcServiceImpl());
052
053  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr)
054      throws IOException {
055    return newBlockingStub(client, addr, User.getCurrent());
056  }
057
058  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr,
059      User user) throws IOException {
060    return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel(
061      ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0));
062  }
063
064  public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException {
065    return TestProtobufRpcProto.newStub(client.createRpcChannel(
066      ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()),
067      User.getCurrent(), 0));
068  }
069
070  @Override
071  public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
072      throws ServiceException {
073    return EmptyResponseProto.getDefaultInstance();
074  }
075
076  @Override
077  public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
078      throws ServiceException {
079    if (controller instanceof HBaseRpcController) {
080      HBaseRpcController pcrc = (HBaseRpcController) controller;
081      // If cells, scan them to check we are able to iterate what we were given and since this is an
082      // echo, just put them back on the controller creating a new block. Tests our block building.
083      CellScanner cellScanner = pcrc.cellScanner();
084      List<Cell> list = null;
085      if (cellScanner != null) {
086        list = new ArrayList<>();
087        try {
088          while (cellScanner.advance()) {
089            list.add(cellScanner.current());
090          }
091        } catch (IOException e) {
092          throw new ServiceException(e);
093        }
094      }
095      cellScanner = CellUtil.createCellScanner(list);
096      pcrc.setCellScanner(cellScanner);
097    }
098    return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
099  }
100
101  @Override
102  public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
103      throws ServiceException {
104    throw new ServiceException(new DoNotRetryIOException("server error!"));
105  }
106
107  @Override
108  public EmptyResponseProto pause(RpcController controller, PauseRequestProto request)
109      throws ServiceException {
110    Threads.sleepWithoutInterrupt(request.getMs());
111    return EmptyResponseProto.getDefaultInstance();
112  }
113
114  @Override
115  public AddrResponseProto addr(RpcController controller, EmptyRequestProto request)
116      throws ServiceException {
117    return AddrResponseProto.newBuilder()
118        .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build();
119  }
120}