001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Arrays;
027import java.util.HashSet;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/** Creates multiple threads that write key/values into the */
042@InterfaceAudience.Private
043public class MultiThreadedWriter extends MultiThreadedWriterBase {
044  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class);
045
046  protected Set<HBaseWriterThread> writers = new HashSet<>();
047
048  protected boolean isMultiPut = false;
049
050  public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName)
051    throws IOException {
052    super(dataGen, conf, tableName, "W");
053  }
054
055  /** Use multi-puts vs. separate puts for every column in a row */
056  public void setMultiPut(boolean isMultiPut) {
057    this.isMultiPut = isMultiPut;
058  }
059
060  @Override
061  public void start(long startKey, long endKey, int numThreads) throws IOException {
062    super.start(startKey, endKey, numThreads);
063
064    if (verbose) {
065      LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
066    }
067
068    createWriterThreads(numThreads);
069
070    startThreads(writers);
071  }
072
073  protected void createWriterThreads(int numThreads) throws IOException {
074    for (int i = 0; i < numThreads; ++i) {
075      HBaseWriterThread writer = new HBaseWriterThread(i);
076      Threads.setLoggingUncaughtExceptionHandler(writer);
077      writers.add(writer);
078    }
079  }
080
081  public class HBaseWriterThread extends Thread {
082    private final Table table;
083
084    public HBaseWriterThread(int writerId) throws IOException {
085      setName(getClass().getSimpleName() + "_" + writerId);
086      table = createTable();
087    }
088
089    protected Table createTable() throws IOException {
090      return connection.getTable(tableName);
091    }
092
093    @Override
094    public void run() {
095      try {
096        long rowKeyBase;
097        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
098        while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
099          byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
100          Put put = new Put(rowKey);
101          numKeys.addAndGet(1);
102          int columnCount = 0;
103          for (byte[] cf : columnFamilies) {
104            byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
105            for (byte[] column : columns) {
106              byte[] value = dataGenerator.generateValue(rowKey, cf, column);
107              put.addColumn(cf, column, value);
108              ++columnCount;
109              if (!isMultiPut) {
110                insert(table, put, rowKeyBase);
111                numCols.addAndGet(1);
112                put = new Put(rowKey);
113              }
114            }
115            long rowKeyHash = Arrays.hashCode(rowKey);
116            put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
117            put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
118            if (!isMultiPut) {
119              insert(table, put, rowKeyBase);
120              numCols.addAndGet(1);
121              put = new Put(rowKey);
122            }
123          }
124          if (isMultiPut) {
125            if (verbose) {
126              LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount
127                + " columns");
128            }
129            insert(table, put, rowKeyBase);
130            numCols.addAndGet(columnCount);
131          }
132          if (trackWroteKeys) {
133            wroteKeys.add(rowKeyBase);
134          }
135        }
136      } finally {
137        closeHTable();
138        numThreadsWorking.decrementAndGet();
139      }
140    }
141
142    public void insert(Table table, Put put, long keyBase) {
143      long start = EnvironmentEdgeManager.currentTime();
144      try {
145        put = (Put) dataGenerator.beforeMutate(keyBase, put);
146        table.put(put);
147        totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
148      } catch (IOException e) {
149        failedKeySet.add(keyBase);
150        String exceptionInfo;
151        if (e instanceof RetriesExhaustedWithDetailsException) {
152          RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
153          exceptionInfo = aggEx.getExhaustiveDescription();
154        } else {
155          StringWriter stackWriter = new StringWriter();
156          PrintWriter pw = new PrintWriter(stackWriter);
157          e.printStackTrace(pw);
158          pw.flush();
159          exceptionInfo = StringUtils.stringifyException(e);
160        }
161        LOG.error("Failed to insert: " + keyBase + " after "
162          + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
163          + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo);
164      }
165    }
166
167    protected void closeHTable() {
168      try {
169        if (table != null) {
170          table.close();
171        }
172      } catch (IOException e) {
173        LOG.error("Error closing table", e);
174      }
175    }
176  }
177
178  @Override
179  public void waitForFinish() {
180    super.waitForFinish();
181    System.out.println("Failed to write keys: " + failedKeySet.size());
182    for (Long key : failedKeySet) {
183      System.out.println("Failed to write key: " + key);
184    }
185  }
186}