001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
022import org.apache.hadoop.util.StringUtils;
023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparator;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MemoryCompactionPolicy;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.ClassSize;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.wal.WAL;
042
043/**
044 * A memstore implementation which supports in-memory compaction.
045 * A compaction pipeline is added between the active set and the snapshot data structures;
046 * it consists of a list of segments that are subject to compaction.
047 * Like the snapshot, all pipeline segments are read-only; updates only affect the active set.
048 * To ensure this property we take advantage of the existing blocking mechanism -- the active set
049 * is pushed to the pipeline while holding the region's updatesLock in exclusive mode.
050 * Periodically, a compaction is applied in the background to all pipeline segments resulting
051 * in a single read-only component. The ``old'' segments are discarded when no scanner is reading
052 * them.
053 */
054@InterfaceAudience.Private
055public class CompactingMemStore extends AbstractMemStore {
056
057  // The external setting of the compacting MemStore behaviour
058  public static final String COMPACTING_MEMSTORE_TYPE_KEY =
059      "hbase.hregion.compacting.memstore.type";
060  public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
061      String.valueOf(MemoryCompactionPolicy.NONE);
062  // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
063  public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
064      "hbase.memstore.inmemoryflush.threshold.factor";
065  private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
066  // In-Memory compaction pool size
067  public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
068      "hbase.regionserver.inmemory.compaction.pool.size";
069  public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
070
071  private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
072  private HStore store;
073  private CompactionPipeline pipeline;
074  protected MemStoreCompactor compactor;
075
076  private long inmemoryFlushSize;       // the threshold on active size for in-memory flush
077  private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
078
079  // inWalReplay is true while we are synchronously replaying the edits from WAL
080  private boolean inWalReplay = false;
081
082  @VisibleForTesting
083  protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
084  private boolean compositeSnapshot = true;
085
086  /**
087   * Types of indexes (part of immutable segments) to be used after flattening,
088   * compaction, or merge are applied.
089   */
090  public enum IndexType {
091    CSLM_MAP,   // ConcurrentSkipLisMap
092    ARRAY_MAP,  // CellArrayMap
093    CHUNK_MAP   // CellChunkMap
094  }
095
096  private IndexType indexType = IndexType.ARRAY_MAP;  // default implementation
097
098  public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
099      + 6 * ClassSize.REFERENCE     // Store, CompactionPipeline,
100      // MemStoreCompactor, inMemoryCompactionInProgress,
101      // allowCompaction, indexType
102      + Bytes.SIZEOF_LONG           // inmemoryFlushSize
103      + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
104      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
105      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
106
107  public CompactingMemStore(Configuration conf, CellComparator c,
108      HStore store, RegionServicesForStores regionServices,
109      MemoryCompactionPolicy compactionPolicy) throws IOException {
110    super(conf, c, regionServices);
111    this.store = store;
112    this.regionServices = regionServices;
113    this.pipeline = new CompactionPipeline(getRegionServices());
114    this.compactor = createMemStoreCompactor(compactionPolicy);
115    if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
116      // if user requested to work with MSLABs (whether on- or off-heap), then the
117      // immutable segments are going to use CellChunkMap as their index
118      indexType = IndexType.CHUNK_MAP;
119    } else {
120      indexType = IndexType.ARRAY_MAP;
121    }
122    // initialization of the flush size should happen after initialization of the index type
123    // so do not transfer the following method
124    initInmemoryFlushSize(conf);
125    LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " +
126            "compactor={}", this.store.getColumnFamilyName(),
127        StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType,
128        (this.compactor == null? "NULL": this.compactor.toString()));
129  }
130
131  @VisibleForTesting
132  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
133      throws IllegalArgumentIOException {
134    return new MemStoreCompactor(this, compactionPolicy);
135  }
136
137  private void initInmemoryFlushSize(Configuration conf) {
138    double factor = 0;
139    long memstoreFlushSize = getRegionServices().getMemStoreFlushSize();
140    int numStores = getRegionServices().getNumStores();
141    if (numStores <= 1) {
142      // Family number might also be zero in some of our unit test case
143      numStores = 1;
144    }
145    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
146    if(factor != 0.0) {
147      // multiply by a factor (the same factor for all index types)
148      inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
149    } else {
150      inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER *
151          conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
152      inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
153    }
154  }
155
156  /**
157   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
158   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
159   *         the memstore may be changed while computing its size. It is the responsibility of the
160   *         caller to make sure this doesn't happen.
161   */
162  @Override
163  public MemStoreSize size() {
164    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
165    memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
166    for (Segment item : pipeline.getSegments()) {
167      memstoreSizing.incMemStoreSize(item.getMemStoreSize());
168    }
169    return memstoreSizing.getMemStoreSize();
170  }
171
172  /**
173   * This method is called before the flush is executed.
174   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
175   * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
176   */
177  @Override
178  public long preFlushSeqIDEstimation() {
179    if(compositeSnapshot) {
180      return HConstants.NO_SEQNUM;
181    }
182    Segment segment = getLastSegment();
183    if(segment == null) {
184      return HConstants.NO_SEQNUM;
185    }
186    return segment.getMinSequenceId();
187  }
188
189  @Override
190  public boolean isSloppy() {
191    return true;
192  }
193
194  /**
195   * Push the current active memstore segment into the pipeline
196   * and create a snapshot of the tail of current compaction pipeline
197   * Snapshot must be cleared by call to {@link #clearSnapshot}.
198   * {@link #clearSnapshot(long)}.
199   * @return {@link MemStoreSnapshot}
200   */
201  @Override
202  public MemStoreSnapshot snapshot() {
203    // If snapshot currently has entries, then flusher failed or didn't call
204    // cleanup.  Log a warning.
205    if (!this.snapshot.isEmpty()) {
206      LOG.warn("Snapshot called again without clearing previous. " +
207          "Doing nothing. Another ongoing flush or did we fail last attempt?");
208    } else {
209      LOG.debug("FLUSHING TO DISK {}, store={}",
210          getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
211      stopCompaction();
212      // region level lock ensures pushing active to pipeline is done in isolation
213      // no concurrent update operations trying to flush the active segment
214      pushActiveToPipeline(getActive());
215      snapshotId = EnvironmentEdgeManager.currentTime();
216      // in both cases whatever is pushed to snapshot is cleared from the pipeline
217      if (compositeSnapshot) {
218        pushPipelineToSnapshot();
219      } else {
220        pushTailToSnapshot();
221      }
222      compactor.resetStats();
223    }
224    return new MemStoreSnapshot(snapshotId, this.snapshot);
225  }
226
227  @Override
228  public MemStoreSize getFlushableSize() {
229    MemStoreSize mss = getSnapshotSize();
230    if (mss.getDataSize() == 0) {
231      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
232      if (compositeSnapshot) {
233        MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
234        MutableSegment currActive = getActive();
235        if(!currActive.isEmpty()) {
236          memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
237        }
238        mss = memStoreSizing.getMemStoreSize();
239      } else {
240        mss = pipeline.getTailSize();
241      }
242    }
243    return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
244  }
245
246
247  public void setInMemoryCompactionCompleted() {
248    inMemoryCompactionInProgress.set(false);
249  }
250
251  protected boolean setInMemoryCompactionFlag() {
252    return inMemoryCompactionInProgress.compareAndSet(false, true);
253  }
254
255  @Override
256  protected long keySize() {
257    // Need to consider dataSize/keySize of all segments in pipeline and active
258    long keySize = getActive().getDataSize();
259    for (Segment segment : this.pipeline.getSegments()) {
260      keySize += segment.getDataSize();
261    }
262    return keySize;
263  }
264
265  @Override
266  protected long heapSize() {
267    // Need to consider heapOverhead of all segments in pipeline and active
268    long h = getActive().getHeapSize();
269    for (Segment segment : this.pipeline.getSegments()) {
270      h += segment.getHeapSize();
271    }
272    return h;
273  }
274
275  @Override
276  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
277    long minSequenceId = pipeline.getMinSequenceId();
278    if(minSequenceId != Long.MAX_VALUE) {
279      byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
280      byte[] familyName = getFamilyNameInBytes();
281      WAL WAL = getRegionServices().getWAL();
282      if (WAL != null) {
283        WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
284      }
285    }
286  }
287
288  /**
289   * This message intends to inform the MemStore that next coming updates
290   * are going to be part of the replaying edits from WAL
291   */
292  @Override
293  public void startReplayingFromWAL() {
294    inWalReplay = true;
295  }
296
297  /**
298   * This message intends to inform the MemStore that the replaying edits from WAL
299   * are done
300   */
301  @Override
302  public void stopReplayingFromWAL() {
303    inWalReplay = false;
304  }
305
306  /**
307   * Issue any synchronization and test needed before applying the update
308   * For compacting memstore this means checking the update can increase the size without
309   * overflow
310   * @param currentActive the segment to be updated
311   * @param cell the cell to be added
312   * @param memstoreSizing object to accumulate region size changes
313   * @return true iff can proceed with applying the update
314   */
315  @Override
316  protected boolean preUpdate(MutableSegment currentActive, Cell cell,
317      MemStoreSizing memstoreSizing) {
318    if (currentActive.sharedLock()) {
319      if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
320        return true;
321      }
322      currentActive.sharedUnlock();
323    }
324    return false;
325  }
326
327  @Override protected void postUpdate(MutableSegment currentActive) {
328    currentActive.sharedUnlock();
329  }
330
331  @Override protected boolean sizeAddedPreOperation() {
332    return true;
333  }
334
335  // the getSegments() method is used for tests only
336  @VisibleForTesting
337  @Override
338  protected List<Segment> getSegments() {
339    List<? extends Segment> pipelineList = pipeline.getSegments();
340    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
341    list.add(getActive());
342    list.addAll(pipelineList);
343    list.addAll(snapshot.getAllSegments());
344
345    return list;
346  }
347
348  // the following three methods allow to manipulate the settings of composite snapshot
349  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
350    this.compositeSnapshot = useCompositeSnapshot;
351  }
352
353  public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
354      boolean merge) {
355    // last true stands for updating the region size
356    return pipeline.swap(versionedList, result, !merge, true);
357  }
358
359  /**
360   * @param requesterVersion The caller must hold the VersionedList of the pipeline
361   *           with version taken earlier. This version must be passed as a parameter here.
362   *           The flattening happens only if versions match.
363   */
364  public void flattenOneSegment(long requesterVersion,  MemStoreCompactionStrategy.Action action) {
365    pipeline.flattenOneSegment(requesterVersion, indexType, action);
366  }
367
368  // setter is used only for testability
369  @VisibleForTesting
370  void setIndexType(IndexType type) {
371    indexType = type;
372    // Because this functionality is for testing only and tests are setting in-memory flush size
373    // according to their need, there is no setting of in-memory flush size, here.
374    // If it is needed, please change in-memory flush size explicitly
375  }
376
377  public IndexType getIndexType() {
378    return indexType;
379  }
380
381  public boolean hasImmutableSegments() {
382    return !pipeline.isEmpty();
383  }
384
385  public VersionedSegmentsList getImmutableSegments() {
386    return pipeline.getVersionedList();
387  }
388
389  public long getSmallestReadPoint() {
390    return store.getSmallestReadPoint();
391  }
392
393  public HStore getStore() {
394    return store;
395  }
396
397  public String getFamilyName() {
398    return Bytes.toString(getFamilyNameInBytes());
399  }
400
401  @Override
402  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
403    MutableSegment activeTmp = getActive();
404    List<? extends Segment> pipelineList = pipeline.getSegments();
405    List<? extends Segment> snapshotList = snapshot.getAllSegments();
406    long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
407    // The list of elements in pipeline + the active element + the snapshot segment
408    List<KeyValueScanner> list = createList((int) numberOfSegments);
409    addToScanners(activeTmp, readPt, list);
410    addToScanners(pipelineList, readPt, list);
411    addToScanners(snapshotList, readPt, list);
412    return list;
413  }
414
415  @VisibleForTesting
416  protected List<KeyValueScanner> createList(int capacity) {
417    return new ArrayList<>(capacity);
418  }
419
420  /**
421   * Check whether anything need to be done based on the current active set size.
422   * The method is invoked upon every addition to the active set.
423   * For CompactingMemStore, flush the active set to the read-only memory if it's
424   * size is above threshold
425   * @param currActive intended segment to update
426   * @param cellToAdd cell to be added to the segment
427   * @param memstoreSizing object to accumulate changed size
428   * @return true if the cell can be added to the
429   */
430  private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
431      MemStoreSizing memstoreSizing) {
432    if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
433      if (currActive.setInMemoryFlushed()) {
434        flushInMemory(currActive);
435        if (setInMemoryCompactionFlag()) {
436          // The thread is dispatched to do in-memory compaction in the background
437          InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
438          if (LOG.isTraceEnabled()) {
439            LOG.trace("Dispatching the MemStore in-memory flush for store " + store
440                .getColumnFamilyName());
441          }
442          getPool().execute(runnable);
443        }
444      }
445      return false;
446    }
447    return true;
448  }
449
450  // externally visible only for tests
451  // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
452  // otherwise there is a deadlock
453  @VisibleForTesting
454  void flushInMemory() {
455    MutableSegment currActive = getActive();
456    if(currActive.setInMemoryFlushed()) {
457      flushInMemory(currActive);
458    }
459    inMemoryCompaction();
460  }
461
462  private void flushInMemory(MutableSegment currActive) {
463    LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
464    pushActiveToPipeline(currActive);
465  }
466
467  void inMemoryCompaction() {
468    // setting the inMemoryCompactionInProgress flag again for the case this method is invoked
469    // directly (only in tests) in the common path setting from true to true is idempotent
470    inMemoryCompactionInProgress.set(true);
471    // Used by tests
472    if (!allowCompaction.get()) {
473      return;
474    }
475    try {
476      // Speculative compaction execution, may be interrupted if flush is forced while
477      // compaction is in progress
478      if(!compactor.start()) {
479        setInMemoryCompactionCompleted();
480      }
481    } catch (IOException e) {
482      LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
483          getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
484    }
485  }
486
487  private Segment getLastSegment() {
488    Segment localActive = getActive();
489    Segment tail = pipeline.getTail();
490    return tail == null ? localActive : tail;
491  }
492
493  private byte[] getFamilyNameInBytes() {
494    return store.getColumnFamilyDescriptor().getName();
495  }
496
497  private ThreadPoolExecutor getPool() {
498    return getRegionServices().getInMemoryCompactionPool();
499  }
500
501  @VisibleForTesting
502  protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
503      MemStoreSizing memstoreSizing) {
504    long cellSize = currActive.getCellLength(cellToAdd);
505    long segmentDataSize = currActive.getDataSize();
506    while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
507      // when replaying edits from WAL there is no need in in-memory flush regardless the size
508      // otherwise size below flush threshold try to update atomically
509      if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
510        if (memstoreSizing != null) {
511          memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
512        }
513        // enough space for cell - no need to flush
514        return false;
515      }
516      segmentDataSize = currActive.getDataSize();
517    }
518    // size above flush threshold
519    return true;
520  }
521
522  /**
523   * The request to cancel the compaction asynchronous task (caused by in-memory flush)
524   * The compaction may still happen if the request was sent too late
525   * Non-blocking request
526   */
527  private void stopCompaction() {
528    if (inMemoryCompactionInProgress.get()) {
529      compactor.stop();
530    }
531  }
532
533  protected void pushActiveToPipeline(MutableSegment currActive) {
534    if (!currActive.isEmpty()) {
535      pipeline.pushHead(currActive);
536      resetActive();
537    }
538  }
539
540  private void pushTailToSnapshot() {
541    VersionedSegmentsList segments = pipeline.getVersionedTail();
542    pushToSnapshot(segments.getStoreSegments());
543    // In Swap: don't close segments (they are in snapshot now) and don't update the region size
544    pipeline.swap(segments,null,false, false);
545  }
546
547  private void pushPipelineToSnapshot() {
548    int iterationsCnt = 0;
549    boolean done = false;
550    while (!done) {
551      iterationsCnt++;
552      VersionedSegmentsList segments = pipeline.getVersionedList();
553      pushToSnapshot(segments.getStoreSegments());
554      // swap can return false in case the pipeline was updated by ongoing compaction
555      // and the version increase, the chance of it happenning is very low
556      // In Swap: don't close segments (they are in snapshot now) and don't update the region size
557      done = pipeline.swap(segments, null, false, false);
558      if (iterationsCnt>2) {
559        // practically it is impossible that this loop iterates more than two times
560        // (because the compaction is stopped and none restarts it while in snapshot request),
561        // however stopping here for the case of the infinite loop causing by any error
562        LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
563            " while flushing to disk.");
564        this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
565        break;
566      }
567    }
568  }
569
570  private void pushToSnapshot(List<ImmutableSegment> segments) {
571    if(segments.isEmpty()) return;
572    if(segments.size() == 1 && !segments.get(0).isEmpty()) {
573      this.snapshot = segments.get(0);
574      return;
575    } else { // create composite snapshot
576      this.snapshot =
577          SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
578    }
579  }
580
581  private RegionServicesForStores getRegionServices() {
582    return regionServices;
583  }
584
585  /**
586   * The in-memory-flusher thread performs the flush asynchronously.
587   * There is at most one thread per memstore instance.
588   * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
589   * and compacts the pipeline.
590   */
591  private class InMemoryCompactionRunnable implements Runnable {
592    @Override
593    public void run() {
594      inMemoryCompaction();
595    }
596  }
597
598  @VisibleForTesting
599  boolean isMemStoreFlushingInMemory() {
600    return inMemoryCompactionInProgress.get();
601  }
602
603  /**
604   * @param cell Find the row that comes after this one.  If null, we return the
605   *             first.
606   * @return Next row or null if none found.
607   */
608  Cell getNextRow(final Cell cell) {
609    Cell lowest = null;
610    List<Segment> segments = getSegments();
611    for (Segment segment : segments) {
612      if (lowest == null) {
613        lowest = getNextRow(cell, segment.getCellSet());
614      } else {
615        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
616      }
617    }
618    return lowest;
619  }
620
621  @VisibleForTesting
622  long getInmemoryFlushSize() {
623    return inmemoryFlushSize;
624  }
625
626  // debug method
627  public void debug() {
628    String msg = "active size=" + getActive().getDataSize();
629    msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
630    msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" :
631        "false");
632    LOG.debug(msg);
633  }
634
635}