001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import java.io.IOException;
022import java.util.PriorityQueue;
023import java.util.Queue;
024import java.util.Set;
025import java.util.concurrent.ArrayBlockingQueue;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.ConcurrentSkipListSet;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/** Creates multiple threads that write key/values into the */
040public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
041  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class);
042
043  /**
044   * A temporary place to keep track of inserted/updated keys. This is written to by
045   * all writers and is drained on a separate thread that populates
046   * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
047   * being inserted/updated. This queue is supposed to stay small.
048   */
049  protected BlockingQueue<Long> wroteKeys;
050
051  /**
052   * This is the current key to be inserted/updated by any thread. Each thread does an
053   * atomic get and increment operation and inserts the current value.
054   */
055  protected AtomicLong nextKeyToWrite = new AtomicLong();
056
057  /**
058   * The highest key in the contiguous range of keys .
059   */
060  protected AtomicLong wroteUpToKey = new AtomicLong();
061
062  /** The sorted set of keys NOT inserted/updated by the writers */
063  protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>();
064
065  /**
066   * The total size of the temporary inserted/updated key set that have not yet lined
067   * up in a our contiguous sequence starting from startKey. Supposed to stay
068   * small.
069   */
070  protected AtomicLong wroteKeyQueueSize = new AtomicLong();
071
072  /** Enable this if used in conjunction with a concurrent reader. */
073  protected boolean trackWroteKeys;
074
075  public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
076      TableName tableName, String actionLetter) throws IOException {
077    super(dataGen, conf, tableName, actionLetter);
078    this.wroteKeys = createWriteKeysQueue(conf);
079  }
080
081  protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
082    return new ArrayBlockingQueue<>(10000);
083  }
084
085  @Override
086  public void start(long startKey, long endKey, int numThreads) throws IOException {
087    super.start(startKey, endKey, numThreads);
088    nextKeyToWrite.set(startKey);
089    wroteUpToKey.set(startKey - 1);
090
091    if (trackWroteKeys) {
092      new Thread(new WroteKeysTracker(),
093          "MultiThreadedWriterBase-WroteKeysTracker-" + System.currentTimeMillis()).start();
094      numThreadsWorking.incrementAndGet();
095    }
096  }
097
098  protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
099    HRegionLocation cached = null, real = null;
100    try {
101      cached = connection.getRegionLocation(tableName, rowKey, false);
102      real = connection.getRegionLocation(tableName, rowKey, true);
103    } catch (Throwable t) {
104      // Cannot obtain region information for another catch block - too bad!
105    }
106    String result = "no information can be obtained";
107    if (cached != null) {
108      result = "cached: " + cached.toString();
109    }
110    if (real != null && real.getServerName() != null) {
111      if (cached != null && cached.getServerName() != null && real.equals(cached)) {
112        result += "; cache is up to date";
113      } else {
114        result = (cached != null) ? (result + "; ") : "";
115        result += "real: " + real.toString();
116      }
117    }
118    return result;
119  }
120
121  /**
122   * A thread that keeps track of the highest key in the contiguous range of
123   * inserted/updated keys.
124   */
125  private class WroteKeysTracker implements Runnable {
126
127    @Override
128    public void run() {
129      Thread.currentThread().setName(getClass().getSimpleName());
130      try {
131        long expectedKey = startKey;
132        Queue<Long> sortedKeys = new PriorityQueue<>();
133        while (expectedKey < endKey) {
134          // Block until a new element is available.
135          Long k;
136          try {
137            k = wroteKeys.poll(1, TimeUnit.SECONDS);
138          } catch (InterruptedException e) {
139            LOG.info("Inserted key tracker thread interrupted", e);
140            break;
141          }
142          if (k == null) {
143            continue;
144          }
145          if (k == expectedKey) {
146            // Skip the "sorted key" queue and consume this key.
147            wroteUpToKey.set(k);
148            ++expectedKey;
149          } else {
150            sortedKeys.add(k);
151          }
152
153          // See if we have a sequence of contiguous keys lined up.
154          while (!sortedKeys.isEmpty()
155              && ((k = sortedKeys.peek()) == expectedKey)) {
156            sortedKeys.poll();
157            wroteUpToKey.set(k);
158            ++expectedKey;
159          }
160
161          wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
162        }
163      } catch (Exception ex) {
164        LOG.error("Error in inserted/updaed key tracker", ex);
165      } finally {
166        numThreadsWorking.decrementAndGet();
167      }
168    }
169  }
170
171  public int getNumWriteFailures() {
172    return failedKeySet.size();
173  }
174
175  /**
176   * The max key until which all keys have been inserted/updated (successfully or not).
177   * @return the last key that we have inserted/updated all keys up to (inclusive)
178   */
179  public long wroteUpToKey() {
180    return wroteUpToKey.get();
181  }
182
183  public boolean failedToWriteKey(long k) {
184    return failedKeySet.contains(k);
185  }
186
187  @Override
188  protected String progressInfo() {
189    StringBuilder sb = new StringBuilder();
190    appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
191    appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
192    return sb.toString();
193  }
194
195  /**
196   * Used for a joint write/read workload. Enables tracking the last inserted/updated
197   * key, which requires a blocking queue and a consumer thread.
198   * @param enable whether to enable tracking the last inserted/updated key
199   */
200  public void setTrackWroteKeys(boolean enable) {
201    trackWroteKeys = enable;
202  }
203}