001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
022import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
023
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.util.Arrays;
028import java.util.HashSet;
029import java.util.Set;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
038import org.apache.hadoop.util.StringUtils;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/** Creates multiple threads that write key/values into the */
043public class MultiThreadedWriter extends MultiThreadedWriterBase {
044  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class);
045
046  protected Set<HBaseWriterThread> writers = new HashSet<>();
047
048  protected boolean isMultiPut = false;
049
050  public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
051      TableName tableName) throws IOException {
052    super(dataGen, conf, tableName, "W");
053  }
054
055  /** Use multi-puts vs. separate puts for every column in a row */
056  public void setMultiPut(boolean isMultiPut) {
057    this.isMultiPut = isMultiPut;
058  }
059
060  @Override
061  public void start(long startKey, long endKey, int numThreads) throws IOException {
062    super.start(startKey, endKey, numThreads);
063
064    if (verbose) {
065      LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
066    }
067
068    createWriterThreads(numThreads);
069
070    startThreads(writers);
071  }
072
073  protected void createWriterThreads(int numThreads) throws IOException {
074    for (int i = 0; i < numThreads; ++i) {
075      HBaseWriterThread writer = new HBaseWriterThread(i);
076      Threads.setLoggingUncaughtExceptionHandler(writer);
077      writers.add(writer);
078    }
079  }
080
081  public class HBaseWriterThread extends Thread {
082    private final Table table;
083
084    public HBaseWriterThread(int writerId) throws IOException {
085      setName(getClass().getSimpleName() + "_" + writerId);
086      table = createTable();
087    }
088
089    protected Table createTable() throws IOException {
090      return connection.getTable(tableName);
091    }
092
093    @Override
094    public void run() {
095      try {
096        long rowKeyBase;
097        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
098        while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
099          byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
100          Put put = new Put(rowKey);
101          numKeys.addAndGet(1);
102          int columnCount = 0;
103          for (byte[] cf : columnFamilies) {
104            byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
105            for (byte[] column : columns) {
106              byte[] value = dataGenerator.generateValue(rowKey, cf, column);
107              put.addColumn(cf, column, value);
108              ++columnCount;
109              if (!isMultiPut) {
110                insert(table, put, rowKeyBase);
111                numCols.addAndGet(1);
112                put = new Put(rowKey);
113              }
114            }
115            long rowKeyHash = Arrays.hashCode(rowKey);
116            put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
117            put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
118            if (!isMultiPut) {
119              insert(table, put, rowKeyBase);
120              numCols.addAndGet(1);
121              put = new Put(rowKey);
122            }
123          }
124          if (isMultiPut) {
125            if (verbose) {
126              LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount + " columns");
127            }
128            insert(table, put, rowKeyBase);
129            numCols.addAndGet(columnCount);
130          }
131          if (trackWroteKeys) {
132            wroteKeys.add(rowKeyBase);
133          }
134        }
135      } finally {
136        closeHTable();
137        numThreadsWorking.decrementAndGet();
138      }
139    }
140
141    public void insert(Table table, Put put, long keyBase) {
142      long start = System.currentTimeMillis();
143      try {
144        put = (Put) dataGenerator.beforeMutate(keyBase, put);
145        table.put(put);
146        totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
147      } catch (IOException e) {
148        failedKeySet.add(keyBase);
149        String exceptionInfo;
150        if (e instanceof RetriesExhaustedWithDetailsException) {
151          RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
152          exceptionInfo = aggEx.getExhaustiveDescription();
153        } else {
154          StringWriter stackWriter = new StringWriter();
155          PrintWriter pw = new PrintWriter(stackWriter);
156          e.printStackTrace(pw);
157          pw.flush();
158          exceptionInfo = StringUtils.stringifyException(e);
159        }
160        LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
161            + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow())
162            + "; errors: " + exceptionInfo);
163      }
164    }
165    protected void closeHTable() {
166      try {
167        if (table != null) {
168          table.close();
169        }
170      } catch (IOException e) {
171        LOG.error("Error closing table", e);
172      }
173    }
174  }
175
176  @Override
177  public void waitForFinish() {
178    super.waitForFinish();
179    System.out.println("Failed to write keys: " + failedKeySet.size());
180    for (Long key : failedKeySet) {
181       System.out.println("Failed to write key: " + key);
182    }
183  }
184}