ecs-observable/src/commands/command-queue.ts

144 lines
4.5 KiB
TypeScript

import { query as makeQuery } from "../query";
import type { World, Entity, ComponentDef, Query } from "../index";
// ── Types ────────────────────────────────────────────
/** A handler that processes a command extracted from an entity. */
export type CommandHandler<T extends Record<string, any>> = (
command: T,
entity?: Entity,
) => void;
/** Pending work: entity, its command data, and the handler to invoke. */
interface Pending<T extends Record<string, any> = any> {
entity: Entity;
handler: CommandHandler<T>;
data: T;
}
/** Registered handler bookkeeping. */
interface Registration<T extends Record<string, any> = any> {
def: ComponentDef<T>;
query: Query;
handler: CommandHandler<T>;
}
// ── CommandQueue ─────────────────────────────────────
/**
* Pull-based command system.
*
* Register handlers for command component types, then call `execute()` each
* frame. It scans the world for entities carrying command components, removes
* them, dispatches to handlers, and destroys entities that become empty.
*
* Interruptions pause processing — while any tracked promise is unresolved,
* `execute()` is a no-op.
*
* @example
* ```ts
* const Damage = defineComponent('damage', { amount: 0 });
*
* const queue = new CommandQueue(world);
* queue.handle(Damage, (cmd, entity) => {
* const hp = world.get(entity!, Health);
* hp.current -= cmd.amount;
* });
*
* // each frame:
* queue.execute();
* ```
*/
export class CommandQueue {
private _world: World;
private _registrations: Registration[] = [];
private _pendingPromises = new Set<Promise<any>>();
private _interrupted = false;
constructor(world: World) {
this._world = world;
}
// ── Registration ─────────────────────────────────
/** Register a handler for `def`. Each handler is called once per entity per frame. */
handle<T extends Record<string, any>>(
def: ComponentDef<T>,
handler: CommandHandler<T>,
): this {
this._registrations.push({ def, query: makeQuery(def), handler });
return this;
}
// ── Interruption ─────────────────────────────────
/**
* Track a promise. While any tracked promise is unresolved,
* `execute()` skips command processing.
*
* Once all tracked promises have settled, processing resumes.
*/
interrupt(promise: Promise<any>): void {
this._pendingPromises.add(promise);
this._interrupted = true;
const remove = () => {
this._pendingPromises.delete(promise);
if (this._pendingPromises.size === 0) {
this._interrupted = false;
}
};
promise.then(remove, remove);
}
/** True while at least one interruption promise is pending. */
get isInterrupted(): boolean {
return this._interrupted;
}
// ── Execution ─────────────────────────────────────
/**
* Drain all command components from the world and dispatch to handlers.
*
* For each registered component type, every matching entity has the
* component removed. The handler receives the entity and the command
* data. If the entity has no components left after removal, it is
* destroyed.
*
* If `isInterrupted` is true, this method is a no-op.
*/
execute(): void {
if (this._interrupted) return;
const pending: Pending[] = [];
// 1. Snapshot + remove command components
for (const reg of this._registrations) {
// Snapshot into array; sparse-set iteration is not mutation-safe
const entities = [...this._world.query(reg.query)];
for (const entity of entities) {
const data = this._world.tryGet(entity, reg.def);
if (data !== undefined) {
this._world.remove(entity, reg.def);
pending.push({ entity, handler: reg.handler, data });
}
}
}
// 2. Destroy entities that became empty after command removal
for (const p of pending) {
if (
this._world.isAlive(p.entity) &&
!this._world.hasAnyComponent(p.entity)
) {
this._world.destroy(p.entity);
}
}
// 3. Dispatch handlers (after cleanup so handlers see consistent state)
for (const p of pending) {
p.handler(p.data, p.entity);
}
}
}