001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.util.AbstractMap.SimpleEntry;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
051
052/**
053 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
054 * Each put will be sharded into different buffer queues based on its destination region server.
055 * So each region server buffer queue will only have the puts which share the same destination.
056 * And each queue will have a flush worker thread to flush the puts request to the region server.
057 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
058 * particular queue.
059 *
060 * Also all the puts will be retried as a configuration number before dropping.
061 * And the HTableMultiplexer can report the number of buffered requests and the number of the
062 * failed (dropped) requests in total or on per region server basis.
063 *
064 * This class is thread safe.
065 */
066@InterfaceAudience.Public
067public class HTableMultiplexer {
068  private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName());
069
070  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
071      "hbase.tablemultiplexer.flush.period.ms";
072  public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
073  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
074      "hbase.client.max.retries.in.queue";
075
076  /** The map between each region server to its flush worker */
077  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
078      new ConcurrentHashMap<>();
079
080  private final Configuration workerConf;
081  private final ClusterConnection conn;
082  private final ExecutorService pool;
083  private final int maxAttempts;
084  private final int perRegionServerBufferQueueSize;
085  private final int maxKeyValueSize;
086  private final ScheduledExecutorService executor;
087  private final long flushPeriod;
088
089  /**
090   * @param conf The HBaseConfiguration
091   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
092   *          each region server before dropping the request.
093   */
094  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
095      throws IOException {
096    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
097  }
098
099  /**
100   * @param conn The HBase connection.
101   * @param conf The HBase configuration
102   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
103   *          each region server before dropping the request.
104   */
105  public HTableMultiplexer(Connection conn, Configuration conf,
106      int perRegionServerBufferQueueSize) {
107    this.conn = (ClusterConnection) conn;
108    this.pool = HTable.getDefaultExecutor(conf);
109    // how many times we could try in total, one more than retry number
110    this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
111        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
112    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
113    this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
114    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
115    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
116    this.executor =
117        Executors.newScheduledThreadPool(initThreads,
118          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
119
120    this.workerConf = HBaseConfiguration.create(conf);
121    // We do not do the retry because we need to reassign puts to different queues if regions are
122    // moved.
123    this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
124  }
125
126  /**
127   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
128   * been closed.
129   * @throws IOException If there is an error closing the connection.
130   */
131  @SuppressWarnings("deprecation")
132  public synchronized void close() throws IOException {
133    if (!getConnection().isClosed()) {
134      getConnection().close();
135    }
136  }
137
138  /**
139   * The put request will be buffered by its corresponding buffer queue. Return false if the queue
140   * is already full.
141   * @param tableName
142   * @param put
143   * @return true if the request can be accepted by its corresponding buffer queue.
144   */
145  public boolean put(TableName tableName, final Put put) {
146    return put(tableName, put, this.maxAttempts);
147  }
148
149  /**
150   * The puts request will be buffered by their corresponding buffer queue.
151   * Return the list of puts which could not be queued.
152   * @param tableName
153   * @param puts
154   * @return the list of puts which could not be queued
155   */
156  public List<Put> put(TableName tableName, final List<Put> puts) {
157    if (puts == null)
158      return null;
159
160    List <Put> failedPuts = null;
161    boolean result;
162    for (Put put : puts) {
163      result = put(tableName, put, this.maxAttempts);
164      if (result == false) {
165
166        // Create the failed puts list if necessary
167        if (failedPuts == null) {
168          failedPuts = new ArrayList<>();
169        }
170        // Add the put to the failed puts list
171        failedPuts.add(put);
172      }
173    }
174    return failedPuts;
175  }
176
177  /**
178   * @deprecated Use {@link #put(TableName, List) } instead.
179   */
180  @Deprecated
181  public List<Put> put(byte[] tableName, final List<Put> puts) {
182    return put(TableName.valueOf(tableName), puts);
183  }
184
185  /**
186   * The put request will be buffered by its corresponding buffer queue. And the put request will be
187   * retried before dropping the request.
188   * Return false if the queue is already full.
189   * @return true if the request can be accepted by its corresponding buffer queue.
190   */
191  public boolean put(final TableName tableName, final Put put, int maxAttempts) {
192    if (maxAttempts <= 0) {
193      return false;
194    }
195
196    try {
197      HTable.validatePut(put, maxKeyValueSize);
198      // Allow mocking to get at the connection, but don't expose the connection to users.
199      ClusterConnection conn = (ClusterConnection) getConnection();
200      // AsyncProcess in the FlushWorker should take care of refreshing the location cache
201      // as necessary. We shouldn't have to do that here.
202      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
203      if (loc != null) {
204        // Add the put pair into its corresponding queue.
205        LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
206
207        // Generate a MultiPutStatus object and offer it into the queue
208        PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts);
209
210        return queue.offer(s);
211      }
212    } catch (IOException e) {
213      LOG.debug("Cannot process the put " + put, e);
214    }
215    return false;
216  }
217
218  /**
219   * @deprecated Use {@link #put(TableName, Put) } instead.
220   */
221  @Deprecated
222  public boolean put(final byte[] tableName, final Put put, int retry) {
223    return put(TableName.valueOf(tableName), put, retry);
224  }
225
226  /**
227   * @deprecated Use {@link #put(TableName, Put)} instead.
228   */
229  @Deprecated
230  public boolean put(final byte[] tableName, Put put) {
231    return put(TableName.valueOf(tableName), put);
232  }
233
234  /**
235   * @return the current HTableMultiplexerStatus
236   */
237  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
238    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
239  }
240
241  @VisibleForTesting
242  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
243    FlushWorker worker = serverToFlushWorkerMap.get(addr);
244    if (worker == null) {
245      synchronized (this.serverToFlushWorkerMap) {
246        worker = serverToFlushWorkerMap.get(addr);
247        if (worker == null) {
248          // Create the flush worker
249          worker = new FlushWorker(workerConf, this.conn, addr, this,
250              perRegionServerBufferQueueSize, pool, executor);
251          this.serverToFlushWorkerMap.put(addr, worker);
252          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
253        }
254      }
255    }
256    return worker.getQueue();
257  }
258
259  @VisibleForTesting
260  ClusterConnection getConnection() {
261    return this.conn;
262  }
263
264  /**
265   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
266   * report the number of buffered requests and the number of the failed (dropped) requests
267   * in total or on per region server basis.
268   */
269  @InterfaceAudience.Public
270  public static class HTableMultiplexerStatus {
271    private long totalFailedPutCounter;
272    private long totalBufferedPutCounter;
273    private long maxLatency;
274    private long overallAverageLatency;
275    private Map<String, Long> serverToFailedCounterMap;
276    private Map<String, Long> serverToBufferedCounterMap;
277    private Map<String, Long> serverToAverageLatencyMap;
278    private Map<String, Long> serverToMaxLatencyMap;
279
280    public HTableMultiplexerStatus(
281        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
282      this.totalBufferedPutCounter = 0;
283      this.totalFailedPutCounter = 0;
284      this.maxLatency = 0;
285      this.overallAverageLatency = 0;
286      this.serverToBufferedCounterMap = new HashMap<>();
287      this.serverToFailedCounterMap = new HashMap<>();
288      this.serverToAverageLatencyMap = new HashMap<>();
289      this.serverToMaxLatencyMap = new HashMap<>();
290      this.initialize(serverToFlushWorkerMap);
291    }
292
293    private void initialize(
294        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
295      if (serverToFlushWorkerMap == null) {
296        return;
297      }
298
299      long averageCalcSum = 0;
300      int averageCalcCount = 0;
301      for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
302          .entrySet()) {
303        HRegionLocation addr = entry.getKey();
304        FlushWorker worker = entry.getValue();
305
306        long bufferedCounter = worker.getTotalBufferedCount();
307        long failedCounter = worker.getTotalFailedCount();
308        long serverMaxLatency = worker.getMaxLatency();
309        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
310        // Get sum and count pieces separately to compute overall average
311        SimpleEntry<Long, Integer> averageComponents = averageCounter
312            .getComponents();
313        long serverAvgLatency = averageCounter.getAndReset();
314
315        this.totalBufferedPutCounter += bufferedCounter;
316        this.totalFailedPutCounter += failedCounter;
317        if (serverMaxLatency > this.maxLatency) {
318          this.maxLatency = serverMaxLatency;
319        }
320        averageCalcSum += averageComponents.getKey();
321        averageCalcCount += averageComponents.getValue();
322
323        this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
324            bufferedCounter);
325        this.serverToFailedCounterMap
326            .put(addr.getHostnamePort(),
327            failedCounter);
328        this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
329            serverAvgLatency);
330        this.serverToMaxLatencyMap
331            .put(addr.getHostnamePort(),
332            serverMaxLatency);
333      }
334      this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
335          / averageCalcCount : 0;
336    }
337
338    public long getTotalBufferedCounter() {
339      return this.totalBufferedPutCounter;
340    }
341
342    public long getTotalFailedCounter() {
343      return this.totalFailedPutCounter;
344    }
345
346    public long getMaxLatency() {
347      return this.maxLatency;
348    }
349
350    public long getOverallAverageLatency() {
351      return this.overallAverageLatency;
352    }
353
354    public Map<String, Long> getBufferedCounterForEachRegionServer() {
355      return this.serverToBufferedCounterMap;
356    }
357
358    public Map<String, Long> getFailedCounterForEachRegionServer() {
359      return this.serverToFailedCounterMap;
360    }
361
362    public Map<String, Long> getMaxLatencyForEachRegionServer() {
363      return this.serverToMaxLatencyMap;
364    }
365
366    public Map<String, Long> getAverageLatencyForEachRegionServer() {
367      return this.serverToAverageLatencyMap;
368    }
369  }
370
371  @VisibleForTesting
372  static class PutStatus {
373    final RegionInfo regionInfo;
374    final Put put;
375    final int maxAttempCount;
376
377    public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) {
378      this.regionInfo = regionInfo;
379      this.put = put;
380      this.maxAttempCount = maxAttempCount;
381    }
382  }
383
384  /**
385   * Helper to count the average over an interval until reset.
386   */
387  private static class AtomicAverageCounter {
388    private long sum;
389    private int count;
390
391    public AtomicAverageCounter() {
392      this.sum = 0L;
393      this.count = 0;
394    }
395
396    public synchronized long getAndReset() {
397      long result = this.get();
398      this.reset();
399      return result;
400    }
401
402    public synchronized long get() {
403      if (this.count == 0) {
404        return 0;
405      }
406      return this.sum / this.count;
407    }
408
409    public synchronized SimpleEntry<Long, Integer> getComponents() {
410      return new SimpleEntry<>(sum, count);
411    }
412
413    public synchronized void reset() {
414      this.sum = 0L;
415      this.count = 0;
416    }
417
418    public synchronized void add(long value) {
419      this.sum += value;
420      this.count++;
421    }
422  }
423
424  @VisibleForTesting
425  static class FlushWorker implements Runnable {
426    private final HRegionLocation addr;
427    private final LinkedBlockingQueue<PutStatus> queue;
428    private final HTableMultiplexer multiplexer;
429    private final AtomicLong totalFailedPutCount = new AtomicLong(0);
430    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
431    private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
432    private final AtomicLong maxLatency = new AtomicLong(0);
433
434    private final AsyncProcess ap;
435    private final List<PutStatus> processingList = new ArrayList<>();
436    private final ScheduledExecutorService executor;
437    private final int maxRetryInQueue;
438    private final AtomicInteger retryInQueue = new AtomicInteger(0);
439    private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
440    private final int operationTimeout;
441    private final ExecutorService pool;
442    public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
443        HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
444        ExecutorService pool, ScheduledExecutorService executor) {
445      this.addr = addr;
446      this.multiplexer = htableMultiplexer;
447      this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
448      RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
449      RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
450      this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
451          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
452              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
453      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
454          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
455      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
456      this.executor = executor;
457      this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
458      this.pool = pool;
459    }
460
461    protected LinkedBlockingQueue<PutStatus> getQueue() {
462      return this.queue;
463    }
464
465    public long getTotalFailedCount() {
466      return totalFailedPutCount.get();
467    }
468
469    public long getTotalBufferedCount() {
470      return (long) queue.size() + currentProcessingCount.get();
471    }
472
473    public AtomicAverageCounter getAverageLatencyCounter() {
474      return this.averageLatency;
475    }
476
477    public long getMaxLatency() {
478      return this.maxLatency.getAndSet(0);
479    }
480
481    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
482      // Decrease the retry count
483      final int retryCount = ps.maxAttempCount - 1;
484
485      if (retryCount <= 0) {
486        // Update the failed counter and no retry any more.
487        return false;
488      }
489
490      int cnt = getRetryInQueue().incrementAndGet();
491      if (cnt > getMaxRetryInQueue()) {
492        // Too many Puts in queue for resubmit, give up this
493        getRetryInQueue().decrementAndGet();
494        return false;
495      }
496
497      final Put failedPut = ps.put;
498      // The currentPut is failed. So get the table name for the currentPut.
499      final TableName tableName = ps.regionInfo.getTable();
500
501      long delayMs = getNextDelay(retryCount);
502      if (LOG.isDebugEnabled()) {
503        LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
504      }
505
506      // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
507      // the region location cache when the Put original failed with some exception. If we keep
508      // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
509      // that we expect it to.
510      getExecutor().schedule(new Runnable() {
511        @Override
512        public void run() {
513          boolean succ = false;
514          try {
515            succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
516          } finally {
517            FlushWorker.this.getRetryInQueue().decrementAndGet();
518            if (!succ) {
519              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
520            }
521          }
522        }
523      }, delayMs, TimeUnit.MILLISECONDS);
524      return true;
525    }
526
527    @VisibleForTesting
528    long getNextDelay(int retryCount) {
529      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
530          multiplexer.maxAttempts - retryCount - 1);
531    }
532
533    @VisibleForTesting
534    AtomicInteger getRetryInQueue() {
535      return this.retryInQueue;
536    }
537
538    @VisibleForTesting
539    int getMaxRetryInQueue() {
540      return this.maxRetryInQueue;
541    }
542
543    @VisibleForTesting
544    AtomicLong getTotalFailedPutCount() {
545      return this.totalFailedPutCount;
546    }
547
548    @VisibleForTesting
549    HTableMultiplexer getMultiplexer() {
550      return this.multiplexer;
551    }
552
553    @VisibleForTesting
554    ScheduledExecutorService getExecutor() {
555      return this.executor;
556    }
557
558    @Override
559    public void run() {
560      int failedCount = 0;
561      try {
562        long start = EnvironmentEdgeManager.currentTime();
563
564        // drain all the queued puts into the tmp list
565        processingList.clear();
566        queue.drainTo(processingList);
567        if (processingList.isEmpty()) {
568          // Nothing to flush
569          return;
570        }
571
572        currentProcessingCount.set(processingList.size());
573        // failedCount is decreased whenever a Put is success or resubmit.
574        failedCount = processingList.size();
575
576        List<Action> retainedActions = new ArrayList<>(processingList.size());
577        MultiAction actions = new MultiAction();
578        for (int i = 0; i < processingList.size(); i++) {
579          PutStatus putStatus = processingList.get(i);
580          Action action = new Action(putStatus.put, i);
581          actions.add(putStatus.regionInfo.getRegionName(), action);
582          retainedActions.add(action);
583        }
584
585        // Process this multi-put request
586        List<PutStatus> failed = null;
587        Object[] results = new Object[actions.size()];
588        ServerName server = addr.getServerName();
589        Map<ServerName, MultiAction> actionsByServer =
590            Collections.singletonMap(server, actions);
591        try {
592          AsyncProcessTask task = AsyncProcessTask.newBuilder()
593                  .setResults(results)
594                  .setPool(pool)
595                  .setRpcTimeout(writeRpcTimeout)
596                  .setOperationTimeout(operationTimeout)
597                  .build();
598          AsyncRequestFuture arf =
599              ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
600          arf.waitUntilDone();
601          if (arf.hasError()) {
602            // We just log and ignore the exception here since failed Puts will be resubmit again.
603            LOG.debug("Caught some exceptions when flushing puts to region server "
604                + addr.getHostnamePort(), arf.getErrors());
605          }
606        } finally {
607          for (int i = 0; i < results.length; i++) {
608            if (results[i] instanceof Result) {
609              failedCount--;
610            } else {
611              if (failed == null) {
612                failed = new ArrayList<>();
613              }
614              failed.add(processingList.get(i));
615            }
616          }
617        }
618
619        if (failed != null) {
620          // Resubmit failed puts
621          for (PutStatus putStatus : failed) {
622            if (resubmitFailedPut(putStatus, this.addr)) {
623              failedCount--;
624            }
625          }
626        }
627
628        long elapsed = EnvironmentEdgeManager.currentTime() - start;
629        // Update latency counters
630        averageLatency.add(elapsed);
631        if (elapsed > maxLatency.get()) {
632          maxLatency.set(elapsed);
633        }
634
635        // Log some basic info
636        if (LOG.isDebugEnabled()) {
637          LOG.debug("Processed " + currentProcessingCount + " put requests for "
638              + addr.getHostnamePort() + " and " + failedCount + " failed"
639              + ", latency for this send: " + elapsed);
640        }
641
642        // Reset the current processing put count
643        currentProcessingCount.set(0);
644      } catch (RuntimeException e) {
645        // To make findbugs happy
646        // Log all the exceptions and move on
647        LOG.debug(
648          "Caught some exceptions " + e + " when flushing puts to region server "
649              + addr.getHostnamePort(), e);
650      } catch (Exception e) {
651        if (e instanceof InterruptedException) {
652          Thread.currentThread().interrupt();
653        }
654        // Log all the exceptions and move on
655        LOG.debug(
656          "Caught some exceptions " + e + " when flushing puts to region server "
657              + addr.getHostnamePort(), e);
658      } finally {
659        // Update the totalFailedCount
660        this.totalFailedPutCount.addAndGet(failedCount);
661      }
662    }
663  }
664}