001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.example;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ForkJoinPool;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.conf.Configured;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.Cell.Type;
035import org.apache.hadoop.hbase.CellBuilderFactory;
036import org.apache.hadoop.hbase.CellBuilderType;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionLocator;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.util.Tool;
049import org.apache.hadoop.util.ToolRunner;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056/**
057 * Example on how to use HBase's {@link Connection} and {@link Table} in a
058 * multi-threaded environment. Each table is a light weight object
059 * that is created and thrown away. Connections are heavy weight objects
060 * that hold on to zookeeper connections, async processes, and other state.
061 *
062 * <pre>
063 * Usage:
064 * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000
065 * </pre>
066 *
067 * <p>
068 * The table should already be created before running the command.
069 * This example expects one column family named d.
070 * </p>
071 * <p>
072 * This is meant to show different operations that are likely to be
073 * done in a real world application. These operations are:
074 * </p>
075 *
076 * <ul>
077 *   <li>
078 *     30% of all operations performed are batch writes.
079 *     30 puts are created and sent out at a time.
080 *     The response for all puts is waited on.
081 *   </li>
082 *   <li>
083 *     20% of all operations are single writes.
084 *     A single put is sent out and the response is waited for.
085 *   </li>
086 *   <li>
087 *     50% of all operations are scans.
088 *     These scans start at a random place and scan up to 100 rows.
089 *   </li>
090 * </ul>
091 *
092 */
093@InterfaceAudience.Private
094public class MultiThreadedClientExample extends Configured implements Tool {
095  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedClientExample.class);
096  private static final int DEFAULT_NUM_OPERATIONS = 500000;
097
098  /**
099   * The name of the column family.
100   *
101   * d for default.
102   */
103  private static final byte[] FAMILY = Bytes.toBytes("d");
104
105  /**
106   * For the example we're just using one qualifier.
107   */
108  private static final byte[] QUAL = Bytes.toBytes("test");
109
110  private final ExecutorService internalPool;
111
112  private final int threads;
113
114  public MultiThreadedClientExample() throws IOException {
115    // Base number of threads.
116    // This represents the number of threads you application has
117    // that can be interacting with an hbase client.
118    this.threads = Runtime.getRuntime().availableProcessors() * 4;
119
120    // Daemon threads are great for things that get shut down.
121    ThreadFactory threadFactory = new ThreadFactoryBuilder()
122        .setDaemon(true).setNameFormat("internal-pol-%d").build();
123
124
125    this.internalPool = Executors.newFixedThreadPool(threads, threadFactory);
126  }
127
128  @Override
129  public int run(String[] args) throws Exception {
130
131    if (args.length < 1 || args.length > 2) {
132      System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]");
133      return -1;
134    }
135
136    final TableName tableName = TableName.valueOf(args[0]);
137    int numOperations = DEFAULT_NUM_OPERATIONS;
138
139    // the second arg is the number of operations to send.
140    if (args.length == 2) {
141      numOperations = Integer.parseInt(args[1]);
142    }
143
144    // Threads for the client only.
145    //
146    // We don't want to mix hbase and business logic.
147    //
148    ExecutorService service = new ForkJoinPool(threads * 2);
149
150    // Create two different connections showing how it's possible to
151    // separate different types of requests onto different connections
152    final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service);
153    final Connection readConnection = ConnectionFactory.createConnection(getConf(), service);
154
155    // At this point the entire cache for the region locations is full.
156    // Only do this if the number of regions in a table is easy to fit into memory.
157    //
158    // If you are interacting with more than 25k regions on a client then it's probably not good
159    // to do this at all.
160    warmUpConnectionCache(readConnection, tableName);
161    warmUpConnectionCache(writeConnection, tableName);
162
163    List<Future<Boolean>> futures = new ArrayList<>(numOperations);
164    for (int i = 0; i < numOperations; i++) {
165      double r = ThreadLocalRandom.current().nextDouble();
166      Future<Boolean> f;
167
168      // For the sake of generating some synthetic load this queues
169      // some different callables.
170      // These callables are meant to represent real work done by your application.
171      if (r < .30) {
172        f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName));
173      } else if (r < .50) {
174        f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName));
175      } else {
176        f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName));
177      }
178      futures.add(f);
179    }
180
181    // Wait a long time for all the reads/writes to complete
182    for (Future<Boolean> f : futures) {
183      f.get(10, TimeUnit.MINUTES);
184    }
185
186    // Clean up after our selves for cleanliness
187    internalPool.shutdownNow();
188    service.shutdownNow();
189    return 0;
190  }
191
192  private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException {
193    try (RegionLocator locator = connection.getRegionLocator(tn)) {
194      LOG.info(
195          "Warmed up region location cache for " + tn
196              + " got " + locator.getAllRegionLocations().size());
197    }
198  }
199
200  /**
201   * Class that will show how to send batches of puts at the same time.
202   */
203  public static class WriteExampleCallable implements Callable<Boolean> {
204    private final Connection connection;
205    private final TableName tableName;
206
207    public WriteExampleCallable(Connection connection, TableName tableName) {
208      this.connection = connection;
209      this.tableName = tableName;
210    }
211
212    @Override
213    public Boolean call() throws Exception {
214
215      // Table implements Closable so we use the try with resource structure here.
216      // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
217      try (Table t = connection.getTable(tableName)) {
218        byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
219        int rows = 30;
220
221        // Array to put the batch
222        ArrayList<Put> puts = new ArrayList<>(rows);
223        for (int i = 0; i < 30; i++) {
224          byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
225          Put p = new Put(rk);
226          p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
227                .setRow(rk)
228                .setFamily(FAMILY)
229                .setQualifier(QUAL)
230                .setTimestamp(p.getTimestamp())
231                .setType(Cell.Type.Put)
232                .setValue(value)
233                .build());
234          puts.add(p);
235        }
236
237        // now that we've assembled the batch it's time to push it to hbase.
238        t.put(puts);
239      }
240      return true;
241    }
242  }
243
244  /**
245   * Class to show how to send a single put.
246   */
247  public static class SingleWriteExampleCallable implements Callable<Boolean> {
248    private final Connection connection;
249    private final TableName tableName;
250
251    public SingleWriteExampleCallable(Connection connection, TableName tableName) {
252      this.connection = connection;
253      this.tableName = tableName;
254    }
255
256    @Override
257    public Boolean call() throws Exception {
258      try (Table t = connection.getTable(tableName)) {
259
260        byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
261        byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
262        Put p = new Put(rk);
263        p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
264                .setRow(rk)
265                .setFamily(FAMILY)
266                .setQualifier(QUAL)
267                .setTimestamp(p.getTimestamp())
268                .setType(Type.Put)
269                .setValue(value)
270                .build());
271        t.put(p);
272      }
273      return true;
274    }
275  }
276
277
278  /**
279   * Class to show how to scan some rows starting at a random location.
280   */
281  public static class ReadExampleCallable implements Callable<Boolean> {
282    private final Connection connection;
283    private final TableName tableName;
284
285    public ReadExampleCallable(Connection connection, TableName tableName) {
286      this.connection = connection;
287      this.tableName = tableName;
288    }
289
290    @Override
291    public Boolean call() throws Exception {
292
293      // total length in bytes of all read rows.
294      int result = 0;
295
296      // Number of rows the scan will read before being considered done.
297      int toRead = 100;
298      try (Table t = connection.getTable(tableName)) {
299        byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
300        Scan s = new Scan().withStartRow(rk);
301
302        // This filter will keep the values from being sent accross the wire.
303        // This is good for counting or other scans that are checking for
304        // existence and don't rely on the value.
305        s.setFilter(new KeyOnlyFilter());
306
307        // Don't go back to the server for every single row.
308        // We know these rows are small. So ask for 20 at a time.
309        // This would be application specific.
310        //
311        // The goal is to reduce round trips but asking for too
312        // many rows can lead to GC problems on client and server sides.
313        s.setCaching(20);
314
315        // Don't use the cache. While this is a silly test program it's still good to be
316        // explicit that scans normally don't use the block cache.
317        s.setCacheBlocks(false);
318
319        // Open up the scanner and close it automatically when done.
320        try (ResultScanner rs = t.getScanner(s)) {
321
322          // Now go through rows.
323          for (Result r : rs) {
324            // Keep track of things size to simulate doing some real work.
325            result += r.getRow().length;
326            toRead -= 1;
327
328            // Most online applications won't be
329            // reading the entire table so this break
330            // simulates small to medium size scans,
331            // without needing to know an end row.
332            if (toRead <= 0)  {
333              break;
334            }
335          }
336        }
337      }
338      return result > 0;
339    }
340  }
341
342  public static void main(String[] args) throws Exception {
343    ToolRunner.run(new MultiThreadedClientExample(), args);
344  }
345}