001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Arrays;
027import java.util.HashSet;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.apache.hadoop.util.StringUtils;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/** Creates multiple threads that write key/values into the */
041public class MultiThreadedWriter extends MultiThreadedWriterBase {
042  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class);
043
044  protected Set<HBaseWriterThread> writers = new HashSet<>();
045
046  protected boolean isMultiPut = false;
047
048  public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName)
049    throws IOException {
050    super(dataGen, conf, tableName, "W");
051  }
052
053  /** Use multi-puts vs. separate puts for every column in a row */
054  public void setMultiPut(boolean isMultiPut) {
055    this.isMultiPut = isMultiPut;
056  }
057
058  @Override
059  public void start(long startKey, long endKey, int numThreads) throws IOException {
060    super.start(startKey, endKey, numThreads);
061
062    if (verbose) {
063      LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
064    }
065
066    createWriterThreads(numThreads);
067
068    startThreads(writers);
069  }
070
071  protected void createWriterThreads(int numThreads) throws IOException {
072    for (int i = 0; i < numThreads; ++i) {
073      HBaseWriterThread writer = new HBaseWriterThread(i);
074      Threads.setLoggingUncaughtExceptionHandler(writer);
075      writers.add(writer);
076    }
077  }
078
079  public class HBaseWriterThread extends Thread {
080    private final Table table;
081
082    public HBaseWriterThread(int writerId) throws IOException {
083      setName(getClass().getSimpleName() + "_" + writerId);
084      table = createTable();
085    }
086
087    protected Table createTable() throws IOException {
088      return connection.getTable(tableName);
089    }
090
091    @Override
092    public void run() {
093      try {
094        long rowKeyBase;
095        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
096        while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
097          byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
098          Put put = new Put(rowKey);
099          numKeys.addAndGet(1);
100          int columnCount = 0;
101          for (byte[] cf : columnFamilies) {
102            byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
103            for (byte[] column : columns) {
104              byte[] value = dataGenerator.generateValue(rowKey, cf, column);
105              put.addColumn(cf, column, value);
106              ++columnCount;
107              if (!isMultiPut) {
108                insert(table, put, rowKeyBase);
109                numCols.addAndGet(1);
110                put = new Put(rowKey);
111              }
112            }
113            long rowKeyHash = Arrays.hashCode(rowKey);
114            put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
115            put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
116            if (!isMultiPut) {
117              insert(table, put, rowKeyBase);
118              numCols.addAndGet(1);
119              put = new Put(rowKey);
120            }
121          }
122          if (isMultiPut) {
123            if (verbose) {
124              LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount
125                + " columns");
126            }
127            insert(table, put, rowKeyBase);
128            numCols.addAndGet(columnCount);
129          }
130          if (trackWroteKeys) {
131            wroteKeys.add(rowKeyBase);
132          }
133        }
134      } finally {
135        closeHTable();
136        numThreadsWorking.decrementAndGet();
137      }
138    }
139
140    public void insert(Table table, Put put, long keyBase) {
141      long start = EnvironmentEdgeManager.currentTime();
142      try {
143        put = (Put) dataGenerator.beforeMutate(keyBase, put);
144        table.put(put);
145        totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
146      } catch (IOException e) {
147        failedKeySet.add(keyBase);
148        String exceptionInfo;
149        if (e instanceof RetriesExhaustedWithDetailsException) {
150          RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
151          exceptionInfo = aggEx.getExhaustiveDescription();
152        } else {
153          StringWriter stackWriter = new StringWriter();
154          PrintWriter pw = new PrintWriter(stackWriter);
155          e.printStackTrace(pw);
156          pw.flush();
157          exceptionInfo = StringUtils.stringifyException(e);
158        }
159        LOG.error("Failed to insert: " + keyBase + " after "
160          + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
161          + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo);
162      }
163    }
164
165    protected void closeHTable() {
166      try {
167        if (table != null) {
168          table.close();
169        }
170      } catch (IOException e) {
171        LOG.error("Error closing table", e);
172      }
173    }
174  }
175
176  @Override
177  public void waitForFinish() {
178    super.waitForFinish();
179    System.out.println("Failed to write keys: " + failedKeySet.size());
180    for (Long key : failedKeySet) {
181      System.out.println("Failed to write key: " + key);
182    }
183  }
184}