001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.InetSocketAddress;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.ExtendedCellScanner;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.yetus.audience.InterfaceAudience;
036
037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
038import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
039import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
041import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
042
043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
047import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
048import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
049import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
050import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
051import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
052
053@InterfaceAudience.Private
054public class TestProtobufRpcServiceImpl implements BlockingInterface, Interface {
055
056  public static final BlockingService SERVICE =
057    TestProtobufRpcProto.newReflectiveBlockingService(new TestProtobufRpcServiceImpl());
058
059  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr)
060    throws IOException {
061    return newBlockingStub(client, addr, User.getCurrent());
062  }
063
064  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr,
065    User user) throws IOException {
066    return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel(
067      ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()),
068      user, 0));
069  }
070
071  public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException {
072    return TestProtobufRpcProto.newStub(client.createRpcChannel(
073      ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()),
074      User.getCurrent(), 0));
075  }
076
077  @Override
078  public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
079    throws ServiceException {
080    return EmptyResponseProto.getDefaultInstance();
081  }
082
083  @Override
084  public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
085    throws ServiceException {
086    if (controller instanceof HBaseRpcController) {
087      HBaseRpcController pcrc = (HBaseRpcController) controller;
088      // If cells, scan them to check we are able to iterate what we were given and since this is an
089      // echo, just put them back on the controller creating a new block. Tests our block building.
090      ExtendedCellScanner cellScanner = pcrc.cellScanner();
091      List<ExtendedCell> list = null;
092      if (cellScanner != null) {
093        list = new ArrayList<>();
094        try {
095          while (cellScanner.advance()) {
096            list.add(cellScanner.current());
097          }
098        } catch (IOException e) {
099          throw new ServiceException(e);
100        }
101      }
102      cellScanner = PrivateCellUtil.createExtendedCellScanner(list);
103      pcrc.setCellScanner(cellScanner);
104    }
105    return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
106  }
107
108  @Override
109  public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
110    throws ServiceException {
111    throw new ServiceException(new DoNotRetryIOException("server error!"));
112  }
113
114  @Override
115  public EmptyResponseProto pause(RpcController controller, PauseRequestProto request)
116    throws ServiceException {
117    Threads.sleepWithoutInterrupt(request.getMs());
118    return EmptyResponseProto.getDefaultInstance();
119  }
120
121  @Override
122  public AddrResponseProto addr(RpcController controller, EmptyRequestProto request)
123    throws ServiceException {
124    return AddrResponseProto.newBuilder()
125      .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build();
126  }
127
128  @Override
129  public void ping(RpcController controller, EmptyRequestProto request,
130    RpcCallback<EmptyResponseProto> done) {
131    done.run(EmptyResponseProto.getDefaultInstance());
132  }
133
134  @Override
135  public void echo(RpcController controller, EchoRequestProto request,
136    RpcCallback<EchoResponseProto> done) {
137    if (controller instanceof HBaseRpcController) {
138      HBaseRpcController pcrc = (HBaseRpcController) controller;
139      // If cells, scan them to check we are able to iterate what we were given and since this is an
140      // echo, just put them back on the controller creating a new block. Tests our block building.
141      ExtendedCellScanner cellScanner = pcrc.cellScanner();
142      List<ExtendedCell> list = null;
143      if (cellScanner != null) {
144        list = new ArrayList<>();
145        try {
146          while (cellScanner.advance()) {
147            list.add(cellScanner.current());
148          }
149        } catch (IOException e) {
150          pcrc.setFailed(e);
151          return;
152        }
153      }
154      cellScanner = PrivateCellUtil.createExtendedCellScanner(list);
155      pcrc.setCellScanner(cellScanner);
156    }
157    done.run(EchoResponseProto.newBuilder().setMessage(request.getMessage()).build());
158  }
159
160  @Override
161  public void error(RpcController controller, EmptyRequestProto request,
162    RpcCallback<EmptyResponseProto> done) {
163    if (controller instanceof HBaseRpcController) {
164      ((HBaseRpcController) controller).setFailed(new DoNotRetryIOException("server error!"));
165    } else {
166      controller.setFailed("server error!");
167    }
168  }
169
170  private final ScheduledExecutorService executor =
171    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
172
173  @Override
174  public void pause(RpcController controller, PauseRequestProto request,
175    RpcCallback<EmptyResponseProto> done) {
176    executor.schedule(() -> done.run(EmptyResponseProto.getDefaultInstance()), request.getMs(),
177      TimeUnit.MILLISECONDS);
178  }
179
180  @Override
181  public void addr(RpcController controller, EmptyRequestProto request,
182    RpcCallback<AddrResponseProto> done) {
183    done.run(AddrResponseProto.newBuilder()
184      .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build());
185  }
186}