001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.ForkJoinPool; 027import java.util.concurrent.Future; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellBuilderFactory; 034import org.apache.hadoop.hbase.CellBuilderType; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.util.Tool; 047import org.apache.hadoop.util.ToolRunner; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053 054/** 055 * Example on how to use HBase's {@link Connection} and {@link Table} in a multi-threaded 056 * environment. Each table is a light weight object that is created and thrown away. Connections are 057 * heavy weight objects that hold on to zookeeper connections, async processes, and other state. 058 * 059 * <pre> 060 * Usage: 061 * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000 062 * </pre> 063 * <p> 064 * The table should already be created before running the command. This example expects one column 065 * family named d. 066 * </p> 067 * <p> 068 * This is meant to show different operations that are likely to be done in a real world 069 * application. These operations are: 070 * </p> 071 * <ul> 072 * <li>30% of all operations performed are batch writes. 30 puts are created and sent out at a time. 073 * The response for all puts is waited on.</li> 074 * <li>20% of all operations are single writes. A single put is sent out and the response is waited 075 * for.</li> 076 * <li>50% of all operations are scans. These scans start at a random place and scan up to 100 rows. 077 * </li> 078 * </ul> 079 */ 080@InterfaceAudience.Private 081public class MultiThreadedClientExample extends Configured implements Tool { 082 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedClientExample.class); 083 private static final int DEFAULT_NUM_OPERATIONS = 500000; 084 085 /** 086 * The name of the column family. d for default. 087 */ 088 private static final byte[] FAMILY = Bytes.toBytes("d"); 089 090 /** 091 * For the example we're just using one qualifier. 092 */ 093 private static final byte[] QUAL = Bytes.toBytes("test"); 094 095 private final ExecutorService internalPool; 096 097 private final int threads; 098 099 public MultiThreadedClientExample() throws IOException { 100 // Base number of threads. 101 // This represents the number of threads you application has 102 // that can be interacting with an hbase client. 103 this.threads = Runtime.getRuntime().availableProcessors() * 4; 104 105 // Daemon threads are great for things that get shut down. 106 ThreadFactory threadFactory = 107 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build(); 108 109 this.internalPool = Executors.newFixedThreadPool(threads, threadFactory); 110 } 111 112 @Override 113 public int run(String[] args) throws Exception { 114 115 if (args.length < 1 || args.length > 2) { 116 System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]"); 117 return -1; 118 } 119 120 final TableName tableName = TableName.valueOf(args[0]); 121 int numOperations = DEFAULT_NUM_OPERATIONS; 122 123 // the second arg is the number of operations to send. 124 if (args.length == 2) { 125 numOperations = Integer.parseInt(args[1]); 126 } 127 128 // Threads for the client only. 129 // 130 // We don't want to mix hbase and business logic. 131 // 132 ExecutorService service = new ForkJoinPool(threads * 2); 133 134 // Create two different connections showing how it's possible to 135 // separate different types of requests onto different connections 136 final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service); 137 final Connection readConnection = ConnectionFactory.createConnection(getConf(), service); 138 139 // At this point the entire cache for the region locations is full. 140 // Only do this if the number of regions in a table is easy to fit into memory. 141 // 142 // If you are interacting with more than 25k regions on a client then it's probably not good 143 // to do this at all. 144 warmUpConnectionCache(readConnection, tableName); 145 warmUpConnectionCache(writeConnection, tableName); 146 147 List<Future<Boolean>> futures = new ArrayList<>(numOperations); 148 for (int i = 0; i < numOperations; i++) { 149 double r = ThreadLocalRandom.current().nextDouble(); 150 Future<Boolean> f; 151 152 // For the sake of generating some synthetic load this queues 153 // some different callables. 154 // These callables are meant to represent real work done by your application. 155 if (r < .30) { 156 f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName)); 157 } else if (r < .50) { 158 f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName)); 159 } else { 160 f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName)); 161 } 162 futures.add(f); 163 } 164 165 // Wait a long time for all the reads/writes to complete 166 for (Future<Boolean> f : futures) { 167 f.get(10, TimeUnit.MINUTES); 168 } 169 170 // Clean up after our selves for cleanliness 171 internalPool.shutdownNow(); 172 service.shutdownNow(); 173 return 0; 174 } 175 176 private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException { 177 try (RegionLocator locator = connection.getRegionLocator(tn)) { 178 LOG.info("Warmed up region location cache for " + tn + " got " 179 + locator.getAllRegionLocations().size()); 180 } 181 } 182 183 /** 184 * Class that will show how to send batches of puts at the same time. 185 */ 186 public static class WriteExampleCallable implements Callable<Boolean> { 187 private final Connection connection; 188 private final TableName tableName; 189 190 public WriteExampleCallable(Connection connection, TableName tableName) { 191 this.connection = connection; 192 this.tableName = tableName; 193 } 194 195 @Override 196 public Boolean call() throws Exception { 197 198 // Table implements Closable so we use the try with resource structure here. 199 // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html 200 try (Table t = connection.getTable(tableName)) { 201 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 202 int rows = 30; 203 204 // Array to put the batch 205 ArrayList<Put> puts = new ArrayList<>(rows); 206 for (int i = 0; i < 30; i++) { 207 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 208 Put p = new Put(rk); 209 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY) 210 .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put) 211 .setValue(value).build()); 212 puts.add(p); 213 } 214 215 // now that we've assembled the batch it's time to push it to hbase. 216 t.put(puts); 217 } 218 return true; 219 } 220 } 221 222 /** 223 * Class to show how to send a single put. 224 */ 225 public static class SingleWriteExampleCallable implements Callable<Boolean> { 226 private final Connection connection; 227 private final TableName tableName; 228 229 public SingleWriteExampleCallable(Connection connection, TableName tableName) { 230 this.connection = connection; 231 this.tableName = tableName; 232 } 233 234 @Override 235 public Boolean call() throws Exception { 236 try (Table t = connection.getTable(tableName)) { 237 238 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 239 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 240 Put p = new Put(rk); 241 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY) 242 .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value) 243 .build()); 244 t.put(p); 245 } 246 return true; 247 } 248 } 249 250 /** 251 * Class to show how to scan some rows starting at a random location. 252 */ 253 public static class ReadExampleCallable implements Callable<Boolean> { 254 private final Connection connection; 255 private final TableName tableName; 256 257 public ReadExampleCallable(Connection connection, TableName tableName) { 258 this.connection = connection; 259 this.tableName = tableName; 260 } 261 262 @Override 263 public Boolean call() throws Exception { 264 265 // total length in bytes of all read rows. 266 int result = 0; 267 268 // Number of rows the scan will read before being considered done. 269 int toRead = 100; 270 try (Table t = connection.getTable(tableName)) { 271 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 272 Scan s = new Scan(rk); 273 274 // This filter will keep the values from being sent accross the wire. 275 // This is good for counting or other scans that are checking for 276 // existence and don't rely on the value. 277 s.setFilter(new KeyOnlyFilter()); 278 279 // Don't go back to the server for every single row. 280 // We know these rows are small. So ask for 20 at a time. 281 // This would be application specific. 282 // 283 // The goal is to reduce round trips but asking for too 284 // many rows can lead to GC problems on client and server sides. 285 s.setCaching(20); 286 287 // Don't use the cache. While this is a silly test program it's still good to be 288 // explicit that scans normally don't use the block cache. 289 s.setCacheBlocks(false); 290 291 // Open up the scanner and close it automatically when done. 292 try (ResultScanner rs = t.getScanner(s)) { 293 294 // Now go through rows. 295 for (Result r : rs) { 296 // Keep track of things size to simulate doing some real work. 297 result += r.getRow().length; 298 toRead -= 1; 299 300 // Most online applications won't be 301 // reading the entire table so this break 302 // simulates small to medium size scans, 303 // without needing to know an end row. 304 if (toRead <= 0) { 305 break; 306 } 307 } 308 } 309 } 310 return result > 0; 311 } 312 } 313 314 public static void main(String[] args) throws Exception { 315 ToolRunner.run(new MultiThreadedClientExample(), args); 316 } 317}