1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.example;
20
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.hbase.TableName;
26 import org.apache.hadoop.hbase.client.Connection;
27 import org.apache.hadoop.hbase.client.ConnectionFactory;
28 import org.apache.hadoop.hbase.client.Put;
29 import org.apache.hadoop.hbase.client.RegionLocator;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.util.Tool;
37 import org.apache.hadoop.util.ToolRunner;
38
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.concurrent.Callable;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ForkJoinPool;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.ThreadFactory;
48 import java.util.concurrent.ThreadLocalRandom;
49 import java.util.concurrent.TimeUnit;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 public class MultiThreadedClientExample extends Configured implements Tool {
90 private static final Log LOG = LogFactory.getLog(MultiThreadedClientExample.class);
91 private static final int DEFAULT_NUM_OPERATIONS = 500000;
92
93
94
95
96
97
98 private static final byte[] FAMILY = Bytes.toBytes("d");
99
100
101
102
103 private static final byte[] QUAL = Bytes.toBytes("test");
104
105 private final ExecutorService internalPool;
106
107 private final int threads;
108
109 public MultiThreadedClientExample() throws IOException {
110
111
112
113 this.threads = Runtime.getRuntime().availableProcessors() * 4;
114
115
116 ThreadFactory threadFactory = new ThreadFactoryBuilder()
117 .setDaemon(true).setNameFormat("internal-pol-%d").build();
118
119
120 this.internalPool = Executors.newFixedThreadPool(threads, threadFactory);
121 }
122
123 @Override
124 public int run(String[] args) throws Exception {
125
126 if (args.length < 1 || args.length > 2) {
127 System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]");
128 return -1;
129 }
130
131 final TableName tableName = TableName.valueOf(args[0]);
132 int numOperations = DEFAULT_NUM_OPERATIONS;
133
134
135 if (args.length == 2) {
136 numOperations = Integer.parseInt(args[1]);
137 }
138
139
140
141
142
143 ExecutorService service = new ForkJoinPool(threads * 2);
144
145
146
147 final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service);
148 final Connection readConnection = ConnectionFactory.createConnection(getConf(), service);
149
150
151
152
153
154
155 warmUpConnectionCache(readConnection, tableName);
156 warmUpConnectionCache(writeConnection, tableName);
157
158 List<Future<Boolean>> futures = new ArrayList<>(numOperations);
159 for (int i = 0; i < numOperations; i++) {
160 double r = ThreadLocalRandom.current().nextDouble();
161 Future<Boolean> f;
162
163
164
165
166 if (r < .30) {
167 f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName));
168 } else if (r < .50) {
169 f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName));
170 } else {
171 f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName));
172 }
173 futures.add(f);
174 }
175
176
177 for (Future<Boolean> f : futures) {
178 f.get(10, TimeUnit.MINUTES);
179 }
180
181
182 internalPool.shutdownNow();
183 service.shutdownNow();
184 return 0;
185 }
186
187 private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException {
188 try (RegionLocator locator = connection.getRegionLocator(tn)) {
189 LOG.info(
190 "Warmed up region location cache for " + tn
191 + " got " + locator.getAllRegionLocations().size());
192 }
193 }
194
195
196
197
198 public static class WriteExampleCallable implements Callable<Boolean> {
199 private final Connection connection;
200 private final TableName tableName;
201
202 public WriteExampleCallable(Connection connection, TableName tableName) {
203 this.connection = connection;
204 this.tableName = tableName;
205 }
206
207 @Override
208 public Boolean call() throws Exception {
209
210
211
212 try (Table t = connection.getTable(tableName)) {
213 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
214 int rows = 30;
215
216
217 ArrayList<Put> puts = new ArrayList<>(rows);
218 for (int i = 0; i < 30; i++) {
219 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
220 Put p = new Put(rk);
221 p.addImmutable(FAMILY, QUAL, value);
222 puts.add(p);
223 }
224
225
226 t.put(puts);
227 }
228 return true;
229 }
230 }
231
232
233
234
235 public static class SingleWriteExampleCallable implements Callable<Boolean> {
236 private final Connection connection;
237 private final TableName tableName;
238
239 public SingleWriteExampleCallable(Connection connection, TableName tableName) {
240 this.connection = connection;
241 this.tableName = tableName;
242 }
243
244 @Override
245 public Boolean call() throws Exception {
246 try (Table t = connection.getTable(tableName)) {
247
248 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
249 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
250 Put p = new Put(rk);
251 p.addImmutable(FAMILY, QUAL, value);
252 t.put(p);
253 }
254 return true;
255 }
256 }
257
258
259
260
261
262 public static class ReadExampleCallable implements Callable<Boolean> {
263 private final Connection connection;
264 private final TableName tableName;
265
266 public ReadExampleCallable(Connection connection, TableName tableName) {
267 this.connection = connection;
268 this.tableName = tableName;
269 }
270
271 @Override
272 public Boolean call() throws Exception {
273
274
275 int result = 0;
276
277
278 int toRead = 100;
279 try (Table t = connection.getTable(tableName)) {
280 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
281 Scan s = new Scan(rk);
282
283
284
285
286 s.setFilter(new KeyOnlyFilter());
287
288
289
290
291
292
293
294 s.setCaching(20);
295
296
297
298 s.setCacheBlocks(false);
299
300
301 try (ResultScanner rs = t.getScanner(s)) {
302
303
304 for (Result r : rs) {
305
306 result += r.getRow().length;
307 toRead -= 1;
308
309
310
311
312
313 if (toRead <= 0) {
314 break;
315 }
316 }
317 }
318 }
319 return result > 0;
320 }
321 }
322
323 public static void main(String[] args) throws Exception {
324 ToolRunner.run(new MultiThreadedClientExample(), args);
325 }
326 }