001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.util.Optional; 025import java.util.concurrent.ExecutionException; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.AsyncConnection; 030import org.apache.hadoop.hbase.client.ConnectionFactory; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 039import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 040import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 041import org.apache.hbase.thirdparty.io.netty.channel.Channel; 042import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 043import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 044import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 046import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 047import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 049import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 050import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; 051import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 052import org.apache.hbase.thirdparty.io.netty.handler.codec.http.DefaultFullHttpResponse; 053import org.apache.hbase.thirdparty.io.netty.handler.codec.http.FullHttpRequest; 054import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpHeaderNames; 055import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpObjectAggregator; 056import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpResponseStatus; 057import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpServerCodec; 058import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpVersion; 059import org.apache.hbase.thirdparty.io.netty.handler.codec.http.QueryStringDecoder; 060import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 061 062/** 063 * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully 064 * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with 065 * the HTTP server. 066 * <p> 067 * The request URL is: 068 * 069 * <pre> 070 * http://<host>:<port>/<table>/<rowgt;/<family>:<qualifier> 071 * </pre> 072 * 073 * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content 074 * when doing PUT. 075 */ 076@InterfaceAudience.Private 077public class HttpProxyExample { 078 079 private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); 080 081 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); 082 083 private final Configuration conf; 084 085 private final int port; 086 087 private AsyncConnection conn; 088 089 private Channel serverChannel; 090 091 private ChannelGroup channelGroup; 092 093 public HttpProxyExample(Configuration conf, int port) { 094 this.conf = conf; 095 this.port = port; 096 } 097 098 private static final class Params { 099 public final String table; 100 101 public final String row; 102 103 public final String family; 104 105 public final String qualifier; 106 107 public Params(String table, String row, String family, String qualifier) { 108 this.table = table; 109 this.row = row; 110 this.family = family; 111 this.qualifier = qualifier; 112 } 113 } 114 115 private static final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { 116 117 private final AsyncConnection conn; 118 119 private final ChannelGroup channelGroup; 120 121 public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) { 122 this.conn = conn; 123 this.channelGroup = channelGroup; 124 } 125 126 @Override 127 public void channelActive(ChannelHandlerContext ctx) { 128 channelGroup.add(ctx.channel()); 129 ctx.fireChannelActive(); 130 } 131 132 @Override 133 public void channelInactive(ChannelHandlerContext ctx) { 134 channelGroup.remove(ctx.channel()); 135 ctx.fireChannelInactive(); 136 } 137 138 private void write(ChannelHandlerContext ctx, HttpResponseStatus status, 139 Optional<String> content) { 140 DefaultFullHttpResponse resp; 141 if (content.isPresent()) { 142 ByteBuf buf = 143 ctx.alloc().buffer().writeBytes(Bytes.toBytes(content.get())); 144 resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf); 145 resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); 146 } else { 147 resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); 148 } 149 resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8"); 150 ctx.writeAndFlush(resp); 151 } 152 153 private Params parse(FullHttpRequest req) { 154 String[] components = new QueryStringDecoder(req.uri()).path().split("/"); 155 Preconditions.checkArgument(components.length == 4, "Unrecognized uri: %s", req.uri()); 156 // path is start with '/' so split will give an empty component 157 String[] cfAndCq = components[3].split(":"); 158 Preconditions.checkArgument(cfAndCq.length == 2, "Unrecognized uri: %s", req.uri()); 159 return new Params(components[1], components[2], cfAndCq[0], cfAndCq[1]); 160 } 161 162 private void get(ChannelHandlerContext ctx, FullHttpRequest req) { 163 Params params = parse(req); 164 addListener( 165 conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) 166 .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))), 167 (r, e) -> { 168 if (e != null) { 169 exceptionCaught(ctx, e); 170 } else { 171 byte[] value = 172 r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)); 173 if (value != null) { 174 write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value))); 175 } else { 176 write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty()); 177 } 178 } 179 }); 180 } 181 182 private void put(ChannelHandlerContext ctx, FullHttpRequest req) { 183 Params params = parse(req); 184 byte[] value = new byte[req.content().readableBytes()]; 185 req.content().readBytes(value); 186 addListener( 187 conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) 188 .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)), 189 (r, e) -> { 190 if (e != null) { 191 exceptionCaught(ctx, e); 192 } else { 193 write(ctx, HttpResponseStatus.OK, Optional.empty()); 194 } 195 }); 196 } 197 198 @Override 199 protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { 200 switch (req.method().name()) { 201 case "GET": 202 get(ctx, req); 203 break; 204 case "PUT": 205 put(ctx, req); 206 break; 207 default: 208 write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, Optional.empty()); 209 break; 210 } 211 } 212 213 @Override 214 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 215 if (cause instanceof IllegalArgumentException) { 216 write(ctx, HttpResponseStatus.BAD_REQUEST, Optional.of(cause.getMessage())); 217 } else { 218 write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, 219 Optional.of(Throwables.getStackTraceAsString(cause))); 220 } 221 } 222 } 223 224 public void start() throws InterruptedException, ExecutionException { 225 NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class); 226 conn = ConnectionFactory.createAsyncConnection(conf).get(); 227 channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 228 serverChannel = new ServerBootstrap().group(bossGroup, workerGroup) 229 .channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true) 230 .childHandler(new ChannelInitializer<Channel>() { 231 232 @Override 233 protected void initChannel(Channel ch) throws Exception { 234 ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024), 235 new RequestHandler(conn, channelGroup)); 236 } 237 }).bind(port).syncUninterruptibly().channel(); 238 } 239 240 public void join() { 241 serverChannel.closeFuture().awaitUninterruptibly(); 242 } 243 244 public int port() { 245 if (serverChannel == null) { 246 return port; 247 } else { 248 return ((InetSocketAddress) serverChannel.localAddress()).getPort(); 249 } 250 } 251 252 public void stop() throws IOException { 253 serverChannel.close().syncUninterruptibly(); 254 serverChannel = null; 255 channelGroup.close().syncUninterruptibly(); 256 channelGroup = null; 257 conn.close(); 258 conn = null; 259 } 260 261 public static void main(String[] args) throws InterruptedException, ExecutionException { 262 int port = Integer.parseInt(args[0]); 263 HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port); 264 proxy.start(); 265 proxy.join(); 266 } 267}