001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 022 023import java.io.IOException; 024import java.net.InetSocketAddress; 025import java.util.Iterator; 026import java.util.List; 027import java.util.concurrent.ExecutionException; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.AsyncConnection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 041import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 042import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 043import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 044import org.apache.hbase.thirdparty.io.netty.channel.Channel; 045import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 046import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 047import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 048import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 049import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 050import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 051import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 052import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 053import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; 054import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 055import org.apache.hbase.thirdparty.io.netty.handler.codec.http.DefaultFullHttpResponse; 056import org.apache.hbase.thirdparty.io.netty.handler.codec.http.FullHttpRequest; 057import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpHeaderNames; 058import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpObjectAggregator; 059import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpResponseStatus; 060import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpServerCodec; 061import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpVersion; 062import org.apache.hbase.thirdparty.io.netty.handler.codec.http.QueryStringDecoder; 063import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 064 065/** 066 * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully 067 * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with 068 * the HTTP server. 069 * <p> 070 * The request URL is: 071 * 072 * <pre> 073 * http://<host>:<port>/<table>/<rowgt;/<family>:<qualifier> 074 * </pre> 075 * 076 * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content 077 * when doing PUT. 078 * <p> 079 * Notice that, future class methods will all return a new Future, so you always have one future 080 * that will not been checked, so we need to suppress error-prone "FutureReturnValueIgnored" 081 * warnings on the methods such as join and stop. In your real production code, you should use your 082 * own convenient way to address the warning. 083 */ 084@InterfaceAudience.Private 085public class HttpProxyExample { 086 087 private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); 088 089 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); 090 091 private final Configuration conf; 092 093 private final int port; 094 095 private AsyncConnection conn; 096 097 private Channel serverChannel; 098 099 private ChannelGroup channelGroup; 100 101 public HttpProxyExample(Configuration conf, int port) { 102 this.conf = conf; 103 this.port = port; 104 } 105 106 private static final class Params { 107 public final String table; 108 109 public final String row; 110 111 public final String family; 112 113 public final String qualifier; 114 115 public Params(String table, String row, String family, String qualifier) { 116 this.table = table; 117 this.row = row; 118 this.family = family; 119 this.qualifier = qualifier; 120 } 121 } 122 123 private static final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { 124 125 private final AsyncConnection conn; 126 127 private final ChannelGroup channelGroup; 128 129 public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) { 130 this.conn = conn; 131 this.channelGroup = channelGroup; 132 } 133 134 @Override 135 public void channelActive(ChannelHandlerContext ctx) { 136 channelGroup.add(ctx.channel()); 137 ctx.fireChannelActive(); 138 } 139 140 @Override 141 public void channelInactive(ChannelHandlerContext ctx) { 142 channelGroup.remove(ctx.channel()); 143 ctx.fireChannelInactive(); 144 } 145 146 private void write(ChannelHandlerContext ctx, HttpResponseStatus status) { 147 write(ctx, status, null); 148 } 149 150 private void write(ChannelHandlerContext ctx, HttpResponseStatus status, String content) { 151 DefaultFullHttpResponse resp; 152 if (content != null) { 153 ByteBuf buf = ctx.alloc().buffer().writeBytes(Bytes.toBytes(content)); 154 resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf); 155 resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); 156 } else { 157 resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); 158 } 159 resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8"); 160 safeWriteAndFlush(ctx, resp); 161 } 162 163 private Params parse(FullHttpRequest req) { 164 List<String> components = 165 Splitter.on('/').splitToList(new QueryStringDecoder(req.uri()).path()); 166 Preconditions.checkArgument(components.size() == 4, "Unrecognized uri: %s", req.uri()); 167 Iterator<String> i = components.iterator(); 168 // path is start with '/' so split will give an empty component 169 i.next(); 170 String table = i.next(); 171 String row = i.next(); 172 List<String> cfAndCq = Splitter.on(':').splitToList(i.next()); 173 Preconditions.checkArgument(cfAndCq.size() == 2, "Unrecognized uri: %s", req.uri()); 174 i = cfAndCq.iterator(); 175 String family = i.next(); 176 String qualifier = i.next(); 177 return new Params(table, row, family, qualifier); 178 } 179 180 private void get(ChannelHandlerContext ctx, FullHttpRequest req) { 181 Params params = parse(req); 182 addListener( 183 conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) 184 .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))), 185 (r, e) -> { 186 if (e != null) { 187 exceptionCaught(ctx, e); 188 } else { 189 byte[] value = 190 r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)); 191 if (value != null) { 192 write(ctx, HttpResponseStatus.OK, Bytes.toStringBinary(value)); 193 } else { 194 write(ctx, HttpResponseStatus.NOT_FOUND); 195 } 196 } 197 }); 198 } 199 200 private void put(ChannelHandlerContext ctx, FullHttpRequest req) { 201 Params params = parse(req); 202 byte[] value = new byte[req.content().readableBytes()]; 203 req.content().readBytes(value); 204 addListener( 205 conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) 206 .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)), 207 (r, e) -> { 208 if (e != null) { 209 exceptionCaught(ctx, e); 210 } else { 211 write(ctx, HttpResponseStatus.OK); 212 } 213 }); 214 } 215 216 @Override 217 protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { 218 switch (req.method().name()) { 219 case "GET": 220 get(ctx, req); 221 break; 222 case "PUT": 223 put(ctx, req); 224 break; 225 default: 226 write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); 227 break; 228 } 229 } 230 231 @Override 232 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 233 if (cause instanceof IllegalArgumentException) { 234 write(ctx, HttpResponseStatus.BAD_REQUEST, cause.getMessage()); 235 } else { 236 write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, 237 Throwables.getStackTraceAsString(cause)); 238 } 239 } 240 } 241 242 public void start() throws InterruptedException, ExecutionException { 243 NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class); 244 conn = ConnectionFactory.createAsyncConnection(conf).get(); 245 channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 246 serverChannel = 247 new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 248 .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_REUSEADDR, true) 249 .childHandler(new ChannelInitializer<Channel>() { 250 251 @Override 252 protected void initChannel(Channel ch) throws Exception { 253 ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024), 254 new RequestHandler(conn, channelGroup)); 255 } 256 }).bind(port).syncUninterruptibly().channel(); 257 } 258 259 @SuppressWarnings("FutureReturnValueIgnored") 260 public void join() { 261 serverChannel.closeFuture().awaitUninterruptibly(); 262 } 263 264 public int port() { 265 if (serverChannel == null) { 266 return port; 267 } else { 268 return ((InetSocketAddress) serverChannel.localAddress()).getPort(); 269 } 270 } 271 272 @SuppressWarnings("FutureReturnValueIgnored") 273 public void stop() throws IOException { 274 serverChannel.close().syncUninterruptibly(); 275 serverChannel = null; 276 channelGroup.close().syncUninterruptibly(); 277 channelGroup = null; 278 conn.close(); 279 conn = null; 280 } 281 282 public static void main(String[] args) throws InterruptedException, ExecutionException { 283 int port = Integer.parseInt(args[0]); 284 HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port); 285 proxy.start(); 286 proxy.join(); 287 } 288}