001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
022import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
023
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.util.Arrays;
028import java.util.HashSet;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.commons.lang3.RandomUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Increment;
042import org.apache.hadoop.hbase.client.Mutation;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
049import org.apache.hadoop.util.StringUtils;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053
054/** Creates multiple threads that write key/values into the */
055public class MultiThreadedUpdater extends MultiThreadedWriterBase {
056  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class);
057
058  protected Set<HBaseUpdaterThread> updaters = new HashSet<>();
059
060  private MultiThreadedWriterBase writer = null;
061  private boolean isBatchUpdate = false;
062  private boolean ignoreNonceConflicts = false;
063  private final double updatePercent;
064
065  public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
066      TableName tableName, double updatePercent) throws IOException {
067    super(dataGen, conf, tableName, "U");
068    this.updatePercent = updatePercent;
069  }
070
071  /** Use batch vs. separate updates for every column in a row */
072  public void setBatchUpdate(boolean isBatchUpdate) {
073    this.isBatchUpdate = isBatchUpdate;
074  }
075
076  public void linkToWriter(MultiThreadedWriterBase writer) {
077    this.writer = writer;
078    writer.setTrackWroteKeys(true);
079  }
080
081  @Override
082  public void start(long startKey, long endKey, int numThreads) throws IOException {
083    super.start(startKey, endKey, numThreads);
084
085    if (verbose) {
086      LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
087    }
088
089    addUpdaterThreads(numThreads);
090
091    startThreads(updaters);
092  }
093
094  protected void addUpdaterThreads(int numThreads) throws IOException {
095    for (int i = 0; i < numThreads; ++i) {
096      HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
097      updaters.add(updater);
098    }
099  }
100
101  private long getNextKeyToUpdate() {
102    if (writer == null) {
103      return nextKeyToWrite.getAndIncrement();
104    }
105    synchronized (this) {
106      if (nextKeyToWrite.get() >= endKey) {
107        // Finished the whole key range
108        return endKey;
109      }
110      while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
111        Threads.sleepWithoutInterrupt(100);
112      }
113      long k = nextKeyToWrite.getAndIncrement();
114      if (writer.failedToWriteKey(k)) {
115        failedKeySet.add(k);
116        return getNextKeyToUpdate();
117      }
118      return k;
119    }
120  }
121
122  protected class HBaseUpdaterThread extends Thread {
123    protected final Table table;
124
125    public HBaseUpdaterThread(int updaterId) throws IOException {
126      setName(getClass().getSimpleName() + "_" + updaterId);
127      table = createTable();
128    }
129
130    protected Table createTable() throws IOException {
131      return connection.getTable(tableName);
132    }
133
134    @Override
135    public void run() {
136      try {
137        long rowKeyBase;
138        StringBuilder buf = new StringBuilder();
139        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
140        while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
141          if (RandomUtils.nextInt(0, 100) < updatePercent) {
142            byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
143            Increment inc = new Increment(rowKey);
144            Append app = new Append(rowKey);
145            numKeys.addAndGet(1);
146            int columnCount = 0;
147            for (byte[] cf : columnFamilies) {
148              long cfHash = Arrays.hashCode(cf);
149              inc.addColumn(cf, INCREMENT, cfHash);
150              buf.setLength(0); // Clear the buffer
151              buf.append("#").append(Bytes.toString(INCREMENT));
152              buf.append(":").append(MutationType.INCREMENT.getNumber());
153              app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
154              ++columnCount;
155              if (!isBatchUpdate) {
156                mutate(table, inc, rowKeyBase);
157                numCols.addAndGet(1);
158                inc = new Increment(rowKey);
159                mutate(table, app, rowKeyBase);
160                numCols.addAndGet(1);
161                app = new Append(rowKey);
162              }
163              Get get = new Get(rowKey);
164              get.addFamily(cf);
165              try {
166                get = dataGenerator.beforeGet(rowKeyBase, get);
167              } catch (Exception e) {
168                // Ideally wont happen
169                LOG.warn("Failed to modify the get from the load generator  = [" + Bytes.toString(get.getRow())
170                    + "], column family = [" + Bytes.toString(cf) + "]", e);
171              }
172              Result result = getRow(get, rowKeyBase, cf);
173              Map<byte[], byte[]> columnValues =
174                result != null ? result.getFamilyMap(cf) : null;
175              if (columnValues == null) {
176                int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
177                if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
178                  LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
179                } else {
180                  failedKeySet.add(rowKeyBase);
181                  LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey)
182                      + "], since we could not get the original row");
183                }
184              }
185              if(columnValues != null) {
186                for (byte[] column : columnValues.keySet()) {
187                  if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
188                    continue;
189                  }
190                  MutationType mt = MutationType
191                      .valueOf(RandomUtils.nextInt(0, MutationType.values().length));
192                  long columnHash = Arrays.hashCode(column);
193                  long hashCode = cfHash + columnHash;
194                  byte[] hashCodeBytes = Bytes.toBytes(hashCode);
195                  byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
196                  if (hashCode % 2 == 0) {
197                    Cell kv = result.getColumnLatestCell(cf, column);
198                    checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
199                    Preconditions.checkNotNull(checkedValue,
200                        "Column value to be checked should not be null");
201                  }
202                  buf.setLength(0); // Clear the buffer
203                  buf.append("#").append(Bytes.toString(column)).append(":");
204                  ++columnCount;
205                  switch (mt) {
206                  case PUT:
207                    Put put = new Put(rowKey);
208                    put.addColumn(cf, column, hashCodeBytes);
209                    mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
210                    buf.append(MutationType.PUT.getNumber());
211                    break;
212                  case DELETE:
213                    Delete delete = new Delete(rowKey);
214                    // Delete all versions since a put
215                    // could be called multiple times if CM is used
216                    delete.addColumns(cf, column);
217                    mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
218                    buf.append(MutationType.DELETE.getNumber());
219                    break;
220                  default:
221                    buf.append(MutationType.APPEND.getNumber());
222                    app.addColumn(cf, column, hashCodeBytes);
223                  }
224                  app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
225                  if (!isBatchUpdate) {
226                    mutate(table, app, rowKeyBase);
227                    numCols.addAndGet(1);
228                    app = new Append(rowKey);
229                  }
230                }
231              }
232            }
233            if (isBatchUpdate) {
234              if (verbose) {
235                LOG.debug("Preparing increment and append for key = ["
236                  + Bytes.toString(rowKey) + "], " + columnCount + " columns");
237              }
238              mutate(table, inc, rowKeyBase);
239              mutate(table, app, rowKeyBase);
240              numCols.addAndGet(columnCount);
241            }
242          }
243          if (trackWroteKeys) {
244            wroteKeys.add(rowKeyBase);
245          }
246        }
247      } finally {
248        closeHTable();
249        numThreadsWorking.decrementAndGet();
250      }
251    }
252
253    protected void closeHTable() {
254      try {
255        if (table != null) {
256          table.close();
257        }
258      } catch (IOException e) {
259        LOG.error("Error closing table", e);
260      }
261    }
262
263    protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
264      Result result = null;
265      try {
266        result = table.get(get);
267      } catch (IOException ie) {
268        LOG.warn(
269            "Failed to get the row for key = [" + Bytes.toString(get.getRow()) + "], column family = ["
270                + Bytes.toString(cf) + "]", ie);
271      }
272      return result;
273    }
274
275    public void mutate(Table table, Mutation m, long keyBase) {
276      mutate(table, m, keyBase, null, null, null, null);
277    }
278
279    public void mutate(Table table, Mutation m,
280        long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
281      long start = System.currentTimeMillis();
282      try {
283        m = dataGenerator.beforeMutate(keyBase, m);
284        if (m instanceof Increment) {
285          table.increment((Increment)m);
286        } else if (m instanceof Append) {
287          table.append((Append)m);
288        } else if (m instanceof Put) {
289          table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
290        } else if (m instanceof Delete) {
291          table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
292        } else {
293          throw new IllegalArgumentException(
294            "unsupported mutation " + m.getClass().getSimpleName());
295        }
296        totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
297      } catch (IOException e) {
298        if (ignoreNonceConflicts) {
299          LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
300          totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
301          return;
302        }
303        failedKeySet.add(keyBase);
304        String exceptionInfo;
305        if (e instanceof RetriesExhaustedWithDetailsException) {
306          RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
307          exceptionInfo = aggEx.getExhaustiveDescription();
308        } else {
309          exceptionInfo = StringUtils.stringifyException(e);
310        }
311        LOG.error("Failed to mutate: " + keyBase + " after " +
312            (System.currentTimeMillis() - start) +
313          "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
314            + exceptionInfo);
315      }
316    }
317  }
318
319  @Override
320  public void waitForFinish() {
321    super.waitForFinish();
322    System.out.println("Failed to update keys: " + failedKeySet.size());
323    for (Long key : failedKeySet) {
324       System.out.println("Failed to update key: " + key);
325    }
326  }
327
328  public void mutate(Table table, Mutation m, long keyBase) {
329    mutate(table, m, keyBase, null, null, null, null);
330  }
331
332  public void mutate(Table table, Mutation m,
333      long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
334    long start = System.currentTimeMillis();
335    try {
336      m = dataGenerator.beforeMutate(keyBase, m);
337      if (m instanceof Increment) {
338        table.increment((Increment)m);
339      } else if (m instanceof Append) {
340        table.append((Append)m);
341      } else if (m instanceof Put) {
342        table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
343      } else if (m instanceof Delete) {
344        table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
345      } else {
346        throw new IllegalArgumentException(
347          "unsupported mutation " + m.getClass().getSimpleName());
348      }
349      totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
350    } catch (IOException e) {
351      failedKeySet.add(keyBase);
352      String exceptionInfo;
353      if (e instanceof RetriesExhaustedWithDetailsException) {
354        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
355        exceptionInfo = aggEx.getExhaustiveDescription();
356      } else {
357        StringWriter stackWriter = new StringWriter();
358        PrintWriter pw = new PrintWriter(stackWriter);
359        e.printStackTrace(pw);
360        pw.flush();
361        exceptionInfo = StringUtils.stringifyException(e);
362      }
363      LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
364        "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
365          + exceptionInfo);
366    }
367  }
368
369  public void setIgnoreNonceConflicts(boolean value) {
370    this.ignoreNonceConflicts = value;
371  }
372}