001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.example;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.util.Optional;
025import java.util.concurrent.ExecutionException;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.AsyncConnection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.yetus.audience.InterfaceAudience;
036
037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
038import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
039import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
040import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
041import org.apache.hbase.thirdparty.io.netty.channel.Channel;
042import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
043import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
044import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
046import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
047import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
048import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
049import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
050import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
051import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
052import org.apache.hbase.thirdparty.io.netty.handler.codec.http.DefaultFullHttpResponse;
053import org.apache.hbase.thirdparty.io.netty.handler.codec.http.FullHttpRequest;
054import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpHeaderNames;
055import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpObjectAggregator;
056import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpResponseStatus;
057import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpServerCodec;
058import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpVersion;
059import org.apache.hbase.thirdparty.io.netty.handler.codec.http.QueryStringDecoder;
060import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
061
062/**
063 * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully
064 * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with
065 * the HTTP server.
066 * <p>
067 * The request URL is:
068 *
069 * <pre>
070 * http://&lt;host&gt;:&lt;port&gt;/&lt;table&gt;/&lt;rowgt;/&lt;family&gt;:&lt;qualifier&gt;
071 * </pre>
072 *
073 * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content
074 * when doing PUT.
075 */
076@InterfaceAudience.Private
077public class HttpProxyExample {
078
079  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
080
081  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
082
083  private final Configuration conf;
084
085  private final int port;
086
087  private AsyncConnection conn;
088
089  private Channel serverChannel;
090
091  private ChannelGroup channelGroup;
092
093  public HttpProxyExample(Configuration conf, int port) {
094    this.conf = conf;
095    this.port = port;
096  }
097
098  private static final class Params {
099    public final String table;
100
101    public final String row;
102
103    public final String family;
104
105    public final String qualifier;
106
107    public Params(String table, String row, String family, String qualifier) {
108      this.table = table;
109      this.row = row;
110      this.family = family;
111      this.qualifier = qualifier;
112    }
113  }
114
115  private static final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
116
117    private final AsyncConnection conn;
118
119    private final ChannelGroup channelGroup;
120
121    public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) {
122      this.conn = conn;
123      this.channelGroup = channelGroup;
124    }
125
126    @Override
127    public void channelActive(ChannelHandlerContext ctx) {
128      channelGroup.add(ctx.channel());
129      ctx.fireChannelActive();
130    }
131
132    @Override
133    public void channelInactive(ChannelHandlerContext ctx) {
134      channelGroup.remove(ctx.channel());
135      ctx.fireChannelInactive();
136    }
137
138    private void write(ChannelHandlerContext ctx, HttpResponseStatus status,
139        Optional<String> content) {
140      DefaultFullHttpResponse resp;
141      if (content.isPresent()) {
142        ByteBuf buf =
143            ctx.alloc().buffer().writeBytes(Bytes.toBytes(content.get()));
144        resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf);
145        resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
146      } else {
147        resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
148      }
149      resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8");
150      ctx.writeAndFlush(resp);
151    }
152
153    private Params parse(FullHttpRequest req) {
154      String[] components = new QueryStringDecoder(req.uri()).path().split("/");
155      Preconditions.checkArgument(components.length == 4, "Unrecognized uri: %s", req.uri());
156      // path is start with '/' so split will give an empty component
157      String[] cfAndCq = components[3].split(":");
158      Preconditions.checkArgument(cfAndCq.length == 2, "Unrecognized uri: %s", req.uri());
159      return new Params(components[1], components[2], cfAndCq[0], cfAndCq[1]);
160    }
161
162    private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
163      Params params = parse(req);
164      addListener(
165        conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
166          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
167        (r, e) -> {
168          if (e != null) {
169            exceptionCaught(ctx, e);
170          } else {
171            byte[] value =
172              r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
173            if (value != null) {
174              write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
175            } else {
176              write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
177            }
178          }
179        });
180    }
181
182    private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
183      Params params = parse(req);
184      byte[] value = new byte[req.content().readableBytes()];
185      req.content().readBytes(value);
186      addListener(
187        conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
188          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
189        (r, e) -> {
190          if (e != null) {
191            exceptionCaught(ctx, e);
192          } else {
193            write(ctx, HttpResponseStatus.OK, Optional.empty());
194          }
195        });
196    }
197
198    @Override
199    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
200      switch (req.method().name()) {
201        case "GET":
202          get(ctx, req);
203          break;
204        case "PUT":
205          put(ctx, req);
206          break;
207        default:
208          write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, Optional.empty());
209          break;
210      }
211    }
212
213    @Override
214    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
215      if (cause instanceof IllegalArgumentException) {
216        write(ctx, HttpResponseStatus.BAD_REQUEST, Optional.of(cause.getMessage()));
217      } else {
218        write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR,
219          Optional.of(Throwables.getStackTraceAsString(cause)));
220      }
221    }
222  }
223
224  public void start() throws InterruptedException, ExecutionException {
225    NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class);
226    conn = ConnectionFactory.createAsyncConnection(conf).get();
227    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
228    serverChannel = new ServerBootstrap().group(bossGroup, workerGroup)
229        .channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true)
230        .childHandler(new ChannelInitializer<Channel>() {
231
232          @Override
233          protected void initChannel(Channel ch) throws Exception {
234            ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024),
235              new RequestHandler(conn, channelGroup));
236          }
237        }).bind(port).syncUninterruptibly().channel();
238  }
239
240  public void join() {
241    serverChannel.closeFuture().awaitUninterruptibly();
242  }
243
244  public int port() {
245    if (serverChannel == null) {
246      return port;
247    } else {
248      return ((InetSocketAddress) serverChannel.localAddress()).getPort();
249    }
250  }
251
252  public void stop() throws IOException {
253    serverChannel.close().syncUninterruptibly();
254    serverChannel = null;
255    channelGroup.close().syncUninterruptibly();
256    channelGroup = null;
257    conn.close();
258    conn = null;
259  }
260
261  public static void main(String[] args) throws InterruptedException, ExecutionException {
262    int port = Integer.parseInt(args[0]);
263    HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port);
264    proxy.start();
265    proxy.join();
266  }
267}