001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
023import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.net.InetSocketAddress;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Random;
034import java.util.concurrent.Callable;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReadWriteLock;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.codec.Codec;
043import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
047import org.apache.hadoop.hbase.testclassification.IntegrationTests;
048import org.apache.hadoop.hbase.util.Threads;
049import org.junit.Ignore;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055
056@Category(IntegrationTests.class)
057public class IntegrationTestRpcClient {
058
059  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRpcClient.class);
060
061  private final Configuration conf;
062
063  private int numIterations = 10;
064
065  public IntegrationTestRpcClient() {
066    conf = HBaseConfiguration.create();
067  }
068
069  protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
070    return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
071      @Override
072      Codec getCodec() {
073        return null;
074      }
075    };
076  }
077
078  static String BIG_PAYLOAD;
079
080  static {
081    StringBuilder builder = new StringBuilder();
082
083    while (builder.length() < 1024 * 1024) { // 2 MB
084      builder.append("big.payload.");
085    }
086
087    BIG_PAYLOAD = builder.toString();
088  }
089
090  class Cluster {
091    Random random = new Random();
092    ReadWriteLock lock = new ReentrantReadWriteLock();
093    HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>();
094    List<RpcServer> serverList = new ArrayList<>();
095    int maxServers;
096    int minServers;
097
098    Cluster(int minServers, int maxServers) {
099      this.minServers = minServers;
100      this.maxServers = maxServers;
101    }
102
103    RpcServer startServer() throws IOException {
104      lock.writeLock().lock();
105      try {
106        if (rpcServers.size() >= maxServers) {
107          return null;
108        }
109
110        RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
111            "testRpcServer", Lists
112                .newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
113            new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(
114                conf, 1));
115        rpcServer.start();
116        InetSocketAddress address = rpcServer.getListenerAddress();
117        if (address == null) {
118          throw new IOException("Listener channel is closed");
119        }
120        rpcServers.put(address, rpcServer);
121        serverList.add(rpcServer);
122        LOG.info("Started server: " + address);
123        return rpcServer;
124      } finally {
125        lock.writeLock().unlock();
126      }
127    }
128
129    void stopRandomServer() throws Exception {
130      lock.writeLock().lock();
131      RpcServer rpcServer = null;
132      try {
133        if (rpcServers.size() <= minServers) {
134          return;
135        }
136        int size = rpcServers.size();
137        int rand = random.nextInt(size);
138        rpcServer = serverList.remove(rand);
139        InetSocketAddress address = rpcServer.getListenerAddress();
140        if (address == null) {
141          // Throw exception here. We can't remove this instance from the server map because
142          // we no longer have access to its map key
143          throw new IOException("Listener channel is closed");
144        }
145        rpcServers.remove(address);
146
147        if (rpcServer != null) {
148          stopServer(rpcServer);
149        }
150      } finally {
151        lock.writeLock().unlock();
152      }
153    }
154
155    void stopServer(RpcServer rpcServer) throws InterruptedException {
156      InetSocketAddress address = rpcServer.getListenerAddress();
157      LOG.info("Stopping server: " + address);
158      rpcServer.stop();
159      rpcServer.join();
160      LOG.info("Stopped server: " + address);
161    }
162
163    void stopRunning() throws InterruptedException {
164      lock.writeLock().lock();
165      try {
166        for (RpcServer rpcServer : serverList) {
167          stopServer(rpcServer);
168        }
169
170      } finally {
171        lock.writeLock().unlock();
172      }
173    }
174
175    RpcServer getRandomServer() {
176      lock.readLock().lock();
177      try {
178        int size = rpcServers.size();
179        int rand = random.nextInt(size);
180        return serverList.get(rand);
181      } finally {
182        lock.readLock().unlock();
183      }
184    }
185  }
186
187  static class MiniChaosMonkey extends Thread {
188    AtomicBoolean running = new  AtomicBoolean(true);
189    Random random = new Random();
190    AtomicReference<Exception> exception = new AtomicReference<>(null);
191    Cluster cluster;
192
193    public MiniChaosMonkey(Cluster cluster) {
194      this.cluster = cluster;
195    }
196
197    @Override
198    public void run() {
199      while (running.get()) {
200        if (random.nextBoolean()) {
201          //start a server
202          try {
203            cluster.startServer();
204          } catch (Exception e) {
205            LOG.warn(e.toString(), e);
206            exception.compareAndSet(null, e);
207          }
208        } else {
209          // stop a server
210          try {
211            cluster.stopRandomServer();
212          } catch (Exception e) {
213            LOG.warn(e.toString(), e);
214            exception.compareAndSet(null, e);
215          }
216        }
217
218        Threads.sleep(100);
219      }
220    }
221
222    void stopRunning() {
223      running.set(false);
224    }
225
226    void rethrowException() throws Exception {
227      if (exception.get() != null) {
228        throw exception.get();
229      }
230    }
231  }
232
233  static class SimpleClient extends Thread {
234    AbstractRpcClient<?> rpcClient;
235    AtomicBoolean running = new  AtomicBoolean(true);
236    AtomicBoolean sending = new AtomicBoolean(false);
237    AtomicReference<Throwable> exception = new AtomicReference<>(null);
238    Cluster cluster;
239    String id;
240    long numCalls = 0;
241    Random random = new Random();
242
243    public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) {
244      this.cluster = cluster;
245      this.rpcClient = rpcClient;
246      this.id = id;
247      this.setName(id);
248    }
249
250    @Override
251    public void run() {
252      while (running.get()) {
253        boolean isBigPayload = random.nextBoolean();
254        String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
255        EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
256        EchoResponseProto ret;
257        RpcServer server = cluster.getRandomServer();
258        try {
259          sending.set(true);
260          BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());
261          ret = stub.echo(null, param);
262        } catch (Exception e) {
263          LOG.warn(e.toString(), e);
264          continue; // expected in case connection is closing or closed
265        }
266
267        try {
268          assertNotNull(ret);
269          assertEquals(message, ret.getMessage());
270        } catch (Throwable t) {
271          exception.compareAndSet(null, t);
272        }
273
274        numCalls++;
275      }
276    }
277
278    void stopRunning() {
279      running.set(false);
280    }
281    boolean isSending() {
282      return sending.get();
283    }
284
285    void rethrowException() throws Throwable {
286      if (exception.get() != null) {
287        throw exception.get();
288      }
289    }
290  }
291
292  /*
293  Test that not started connections are successfully removed from connection pool when
294  rpc client is closing.
295   */
296  @Test
297  public void testRpcWithWriteThread() throws IOException, InterruptedException {
298    LOG.info("Starting test");
299    Cluster cluster = new Cluster(1, 1);
300    cluster.startServer();
301    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
302    for(int i = 0; i <1000; i++) {
303      AbstractRpcClient<?> rpcClient = createRpcClient(conf, true);
304      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
305      client.start();
306      while(!client.isSending()) {
307        Thread.sleep(1);
308      }
309      client.stopRunning();
310      rpcClient.close();
311    }
312  }
313
314
315  @Test
316  public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
317    for (int i = 0; i < numIterations; i++) {
318      TimeoutThread.runWithTimeout(new Callable<Void>() {
319        @Override
320        public Void call() throws Exception {
321          try {
322            testRpcWithChaosMonkey(true);
323          } catch (Throwable e) {
324            if (e instanceof Exception) {
325              throw (Exception)e;
326            } else {
327              throw new Exception(e);
328            }
329          }
330          return null;
331        }
332      }, 180000);
333    }
334  }
335
336  @Test
337  @Ignore // TODO: test fails with async client
338  public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
339    for (int i = 0; i < numIterations; i++) {
340      TimeoutThread.runWithTimeout(new Callable<Void>() {
341        @Override
342        public Void call() throws Exception {
343          try {
344            testRpcWithChaosMonkey(false);
345          } catch (Throwable e) {
346            if (e instanceof Exception) {
347              throw (Exception)e;
348            } else {
349              throw new Exception(e);
350            }
351          }
352          return null;
353        }
354      }, 90000);
355    }
356  }
357
358  static class TimeoutThread extends Thread {
359    long timeout;
360    public TimeoutThread(long timeout) {
361      this.timeout = timeout;
362    }
363
364    @Override
365    public void run() {
366      try {
367        Thread.sleep(timeout);
368        Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
369        System.exit(1); // a timeout happened
370      } catch (InterruptedException e) {
371        // this is what we want
372      }
373    }
374
375    // runs in the same thread context but injects a timeout thread which will exit the JVM on
376    // timeout
377    static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
378      TimeoutThread thread = new TimeoutThread(timeout);
379      thread.start();
380      callable.call();
381      thread.interrupt();
382    }
383  }
384
385  public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
386    LOG.info("Starting test");
387    Cluster cluster = new Cluster(10, 100);
388    for (int i = 0; i < 10; i++) {
389      cluster.startServer();
390    }
391
392    ArrayList<SimpleClient> clients = new ArrayList<>(30);
393
394    // all threads should share the same rpc client
395    AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient);
396
397    for (int i = 0; i < 30; i++) {
398      String clientId = "client_" + i + "_";
399      LOG.info("Starting client: " + clientId);
400      SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
401      client.start();
402      clients.add(client);
403    }
404
405    LOG.info("Starting MiniChaosMonkey");
406    MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
407    cm.start();
408
409    Threads.sleep(30000);
410
411    LOG.info("Stopping MiniChaosMonkey");
412    cm.stopRunning();
413    cm.join();
414    cm.rethrowException();
415
416    LOG.info("Stopping clients");
417    for (SimpleClient client : clients) {
418      LOG.info("Stopping client: " + client.id);
419      LOG.info(client.id + " numCalls:" + client.numCalls);
420      client.stopRunning();
421      client.join();
422      client.rethrowException();
423      assertTrue(client.numCalls > 10);
424    }
425
426    LOG.info("Stopping RpcClient");
427    rpcClient.close();
428
429    LOG.info("Stopping Cluster");
430    cluster.stopRunning();
431  }
432}