001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.InetSocketAddress;
022import java.util.ArrayList;
023import java.util.List;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellScanner;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.yetus.audience.InterfaceAudience;
033
034import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
035import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
036import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
037
038import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
039import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
040import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
041import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
042import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
047
048@InterfaceAudience.Private
049public class TestProtobufRpcServiceImpl implements BlockingInterface {
050
051  public static final BlockingService SERVICE =
052    TestProtobufRpcProto.newReflectiveBlockingService(new TestProtobufRpcServiceImpl());
053
054  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr)
055    throws IOException {
056    return newBlockingStub(client, addr, User.getCurrent());
057  }
058
059  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr,
060    User user) throws IOException {
061    return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel(
062      ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()),
063      user, 0));
064  }
065
066  public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException {
067    return TestProtobufRpcProto.newStub(client.createRpcChannel(
068      ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()),
069      User.getCurrent(), 0));
070  }
071
072  @Override
073  public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
074    throws ServiceException {
075    return EmptyResponseProto.getDefaultInstance();
076  }
077
078  @Override
079  public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
080    throws ServiceException {
081    if (controller instanceof HBaseRpcController) {
082      HBaseRpcController pcrc = (HBaseRpcController) controller;
083      // If cells, scan them to check we are able to iterate what we were given and since this is an
084      // echo, just put them back on the controller creating a new block. Tests our block building.
085      CellScanner cellScanner = pcrc.cellScanner();
086      List<Cell> list = null;
087      if (cellScanner != null) {
088        list = new ArrayList<>();
089        try {
090          while (cellScanner.advance()) {
091            list.add(cellScanner.current());
092          }
093        } catch (IOException e) {
094          throw new ServiceException(e);
095        }
096      }
097      cellScanner = CellUtil.createCellScanner(list);
098      pcrc.setCellScanner(cellScanner);
099    }
100    return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
101  }
102
103  @Override
104  public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
105    throws ServiceException {
106    throw new ServiceException(new DoNotRetryIOException("server error!"));
107  }
108
109  @Override
110  public EmptyResponseProto pause(RpcController controller, PauseRequestProto request)
111    throws ServiceException {
112    Threads.sleepWithoutInterrupt(request.getMs());
113    return EmptyResponseProto.getDefaultInstance();
114  }
115
116  @Override
117  public AddrResponseProto addr(RpcController controller, EmptyRequestProto request)
118    throws ServiceException {
119    return AddrResponseProto.newBuilder()
120      .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build();
121  }
122}