clean-jsdoc-theme API
queue.jsjavascript
/**
 * The event-driven task {@link Queue} and its supporting types.
 *
 * @module forge/queue
 */

/**
 * A unit of work the {@link Queue} can run.
 *
 * @typedef {Object} Task
 * @property {string} id - Stable identifier for the task.
 * @property {Priority} [priority=Priority.NORMAL] - Relative scheduling weight.
 * @property {TaskHandler} run - The function invoked when the task is scheduled.
 */

/**
 * The function a {@link Task} runs when scheduled.
 *
 * @callback TaskHandler
 * @param {AbortSignal} signal - Aborted if the queue is cleared mid-flight.
 * @returns {Promise<void>|void} Resolves when the task is complete.
 */

/**
 * Maximum concurrency the queue will ever allow, regardless of options.
 *
 * @constant {number}
 * @default
 */
export const MAX_CONCURRENCY = 32;

/**
 * An ordered, event-driven task runner.
 *
 * `Queue` extends Node's `EventEmitter`, so the inheritance chain and the
 * emitted events both show up in the generated page.
 *
 * > [!CAUTION]
 * > Tasks pushed after {@link Queue#clear} runs are rejected — clearing aborts
 * > the in-flight signal.
 *
 * @category Core/Runtime order=1
 * @extends EventEmitter
 * @fires Queue#event:drain
 * @fires Queue#event:error
 * @listens module:forge/cache.Cache#event:evict
 * @example <caption>Draining a queue</caption>
 * const q = new Queue({ concurrency: 2 });
 * q.on('drain', () => console.log('done'));
 * q.push({ id: 't1', run: async () => doWork() });
 * await q.drain();
 * @playground
 *
 * @since 1.1.0
 * @version 1.3.0
 * @requires module:forge/cache
 * @todo Add backpressure when the pending list exceeds a high-water mark.
 * @copyright 2024 The clean-jsdoc-theme team
 * @author The clean-jsdoc-theme team
 */
export class Queue {
  /**
   * @param {Object} [options={}] - Queue configuration.
   * @param {number} [options.concurrency=1] - How many tasks run at once
   *   (clamped to {@link MAX_CONCURRENCY}).
   * @param {boolean} [options.autoStart=true] - Start draining as soon as a
   *   task is pushed.
   */
  constructor(options = {}) {
    /**
     * Effective concurrency, clamped to {@link MAX_CONCURRENCY}.
     *
     * @readonly
     * @type {number}
     */
    this.concurrency = Math.min(options.concurrency ?? 1, MAX_CONCURRENCY);

    /**
     * Pending tasks, highest priority first.
     *
     * @private
     * @type {Task[]}
     */
    this._pending = [];
  }

  /**
   * Add a task to the queue, inserted by {@link Priority}.
   *
   * @param {Task} task - The task to enqueue.
   * @returns {this} The queue, for chaining.
   */
  push(task) {
    this._pending.push(task);
    return this;
  }

  /**
   * Run every pending task and resolve once the queue is empty.
   *
   * @async
   * @returns {Promise<number>} The number of tasks that ran.
   * @fires Queue#event:drain
   * @example
   * const ran = await queue.drain();
   * @playground codesandbox filename=drain.js highlight=2,3
   */
  async drain() {
    const count = this._pending.length;
    this._pending = [];
    return count;
  }

  /**
   * Iterate pending tasks in scheduling order, highest priority first.
   *
   * @generator
   * @yields {Task} The next pending task.
   * @example <caption>The {@lang} directive forces this block's language</caption>
   * {@lang ts}
   * for (const task of queue.tasks()) {
   *   console.log(task.id satisfies string);
   * }
   *
   * @playground
   */
  *tasks() {
    yield* this._pending;
  }

  /**
   * Handle a task failure. Subclasses **must** implement this.
   *
   * @abstract
   * @param {Error} error - The error a task threw.
   * @returns {void}
   */
  onError(error) {
    throw new Error('onError() must be implemented by a subclass');
  }

  /**
   * Pull and start the next task. Internal scheduling step.
   *
   * @protected
   * @returns {Task|undefined} The task that was started, if any.
   */
  _next() {
    return this._pending.shift();
  }

  /**
   * Stop and clear the queue, aborting any in-flight task.
   *
   * @override
   * @returns {void}
   */
  clear() {
    this._pending = [];
  }
}

/**
 * Fired once the queue has processed every pending task.
 *
 * @event Queue#event:drain
 */

/**
 * Fired when a task throws and {@link Queue#onError} re-raises.
 *
 * @event Queue#event:error
 * @type {Error}
 */