001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.example;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import java.util.concurrent.Future;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.conf.Configured;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellBuilderFactory;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionLocator;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.util.Tool;
048import org.apache.hadoop.util.ToolRunner;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055/**
056 * Example on how to use HBase's {@link Connection} and {@link Table} in a multi-threaded
057 * environment. Each table is a light weight object that is created and thrown away. Connections are
058 * heavy weight objects that hold on to zookeeper connections, async processes, and other state.
059 *
060 * <pre>
061 * Usage:
062 * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000
063 * </pre>
064 * <p>
065 * The table should already be created before running the command. This example expects one column
066 * family named d.
067 * </p>
068 * <p>
069 * This is meant to show different operations that are likely to be done in a real world
070 * application. These operations are:
071 * </p>
072 * <ul>
073 * <li>30% of all operations performed are batch writes. 30 puts are created and sent out at a time.
074 * The response for all puts is waited on.</li>
075 * <li>20% of all operations are single writes. A single put is sent out and the response is waited
076 * for.</li>
077 * <li>50% of all operations are scans. These scans start at a random place and scan up to 100 rows.
078 * </li>
079 * </ul>
080 */
081@InterfaceAudience.Private
082public class MultiThreadedClientExample extends Configured implements Tool {
083  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedClientExample.class);
084  private static final int DEFAULT_NUM_OPERATIONS = 500000;
085
086  /**
087   * The name of the column family. d for default.
088   */
089  private static final byte[] FAMILY = Bytes.toBytes("d");
090
091  /**
092   * For the example we're just using one qualifier.
093   */
094  private static final byte[] QUAL = Bytes.toBytes("test");
095
096  private final ExecutorService internalPool;
097
098  private final int threads;
099
100  public MultiThreadedClientExample() throws IOException {
101    // Base number of threads.
102    // This represents the number of threads you application has
103    // that can be interacting with an hbase client.
104    this.threads = Runtime.getRuntime().availableProcessors() * 4;
105
106    // Daemon threads are great for things that get shut down.
107    ThreadFactory threadFactory =
108      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build();
109
110    this.internalPool = Executors.newFixedThreadPool(threads, threadFactory);
111  }
112
113  @Override
114  public int run(String[] args) throws Exception {
115
116    if (args.length < 1 || args.length > 2) {
117      System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]");
118      return -1;
119    }
120
121    final TableName tableName = TableName.valueOf(args[0]);
122    int numOperations = DEFAULT_NUM_OPERATIONS;
123
124    // the second arg is the number of operations to send.
125    if (args.length == 2) {
126      numOperations = Integer.parseInt(args[1]);
127    }
128
129    // Threads for the client only.
130    //
131    // We don't want to mix hbase and business logic.
132    //
133    ThreadPoolExecutor service = new ThreadPoolExecutor(threads * 2, threads * 2, 60L,
134      TimeUnit.SECONDS, new LinkedBlockingQueue<>());
135
136    // Create two different connections showing how it's possible to
137    // separate different types of requests onto different connections
138    final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service);
139    final Connection readConnection = ConnectionFactory.createConnection(getConf(), service);
140
141    // At this point the entire cache for the region locations is full.
142    // Only do this if the number of regions in a table is easy to fit into memory.
143    //
144    // If you are interacting with more than 25k regions on a client then it's probably not good
145    // to do this at all.
146    warmUpConnectionCache(readConnection, tableName);
147    warmUpConnectionCache(writeConnection, tableName);
148
149    List<Future<Boolean>> futures = new ArrayList<>(numOperations);
150    for (int i = 0; i < numOperations; i++) {
151      double r = ThreadLocalRandom.current().nextDouble();
152      Future<Boolean> f;
153
154      // For the sake of generating some synthetic load this queues
155      // some different callables.
156      // These callables are meant to represent real work done by your application.
157      if (r < .30) {
158        f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName));
159      } else if (r < .50) {
160        f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName));
161      } else {
162        f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName));
163      }
164      futures.add(f);
165    }
166
167    // Wait a long time for all the reads/writes to complete
168    for (Future<Boolean> f : futures) {
169      f.get(10, TimeUnit.MINUTES);
170    }
171
172    // Clean up after our selves for cleanliness
173    internalPool.shutdownNow();
174    service.shutdownNow();
175    return 0;
176  }
177
178  private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException {
179    try (RegionLocator locator = connection.getRegionLocator(tn)) {
180      LOG.info("Warmed up region location cache for " + tn + " got "
181        + locator.getAllRegionLocations().size());
182    }
183  }
184
185  /**
186   * Class that will show how to send batches of puts at the same time.
187   */
188  public static class WriteExampleCallable implements Callable<Boolean> {
189    private final Connection connection;
190    private final TableName tableName;
191
192    public WriteExampleCallable(Connection connection, TableName tableName) {
193      this.connection = connection;
194      this.tableName = tableName;
195    }
196
197    @Override
198    public Boolean call() throws Exception {
199
200      // Table implements Closable so we use the try with resource structure here.
201      // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
202      try (Table t = connection.getTable(tableName)) {
203        byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
204        int rows = 30;
205
206        // Array to put the batch
207        ArrayList<Put> puts = new ArrayList<>(rows);
208        for (int i = 0; i < 30; i++) {
209          byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
210          Put p = new Put(rk);
211          p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY)
212            .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put)
213            .setValue(value).build());
214          puts.add(p);
215        }
216
217        // now that we've assembled the batch it's time to push it to hbase.
218        t.put(puts);
219      }
220      return true;
221    }
222  }
223
224  /**
225   * Class to show how to send a single put.
226   */
227  public static class SingleWriteExampleCallable implements Callable<Boolean> {
228    private final Connection connection;
229    private final TableName tableName;
230
231    public SingleWriteExampleCallable(Connection connection, TableName tableName) {
232      this.connection = connection;
233      this.tableName = tableName;
234    }
235
236    @Override
237    public Boolean call() throws Exception {
238      try (Table t = connection.getTable(tableName)) {
239
240        byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
241        byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
242        Put p = new Put(rk);
243        p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY)
244          .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value)
245          .build());
246        t.put(p);
247      }
248      return true;
249    }
250  }
251
252  /**
253   * Class to show how to scan some rows starting at a random location.
254   */
255  public static class ReadExampleCallable implements Callable<Boolean> {
256    private final Connection connection;
257    private final TableName tableName;
258
259    public ReadExampleCallable(Connection connection, TableName tableName) {
260      this.connection = connection;
261      this.tableName = tableName;
262    }
263
264    @Override
265    public Boolean call() throws Exception {
266
267      // total length in bytes of all read rows.
268      int result = 0;
269
270      // Number of rows the scan will read before being considered done.
271      int toRead = 100;
272      try (Table t = connection.getTable(tableName)) {
273        byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
274        Scan s = new Scan().withStartRow(rk);
275
276        // This filter will keep the values from being sent accross the wire.
277        // This is good for counting or other scans that are checking for
278        // existence and don't rely on the value.
279        s.setFilter(new KeyOnlyFilter());
280
281        // Don't go back to the server for every single row.
282        // We know these rows are small. So ask for 20 at a time.
283        // This would be application specific.
284        //
285        // The goal is to reduce round trips but asking for too
286        // many rows can lead to GC problems on client and server sides.
287        s.setCaching(20);
288
289        // Don't use the cache. While this is a silly test program it's still good to be
290        // explicit that scans normally don't use the block cache.
291        s.setCacheBlocks(false);
292
293        // Open up the scanner and close it automatically when done.
294        try (ResultScanner rs = t.getScanner(s)) {
295
296          // Now go through rows.
297          for (Result r : rs) {
298            // Keep track of things size to simulate doing some real work.
299            result += r.getRow().length;
300            toRead -= 1;
301
302            // Most online applications won't be
303            // reading the entire table so this break
304            // simulates small to medium size scans,
305            // without needing to know an end row.
306            if (toRead <= 0) {
307              break;
308            }
309          }
310        }
311      }
312      return result > 0;
313    }
314  }
315
316  public static void main(String[] args) throws Exception {
317    ToolRunner.run(new MultiThreadedClientExample(), args);
318  }
319}