refactor: add a new async queue

This commit is contained in:
hypercross 2026-04-02 10:04:22 +08:00
parent 40788d445d
commit 9c7baa29ef
2 changed files with 136 additions and 0 deletions

32
src/utils/async-queue.ts Normal file
View File

@ -0,0 +1,32 @@
export class AsyncQueue<T> {
private items: T[] = [];
private resolvers: ((value: T) => void)[] = [];
push(item: T): void {
if (this.resolvers.length > 0) {
const resolve = this.resolvers.shift()!;
resolve(item);
} else {
this.items.push(item);
}
}
pushAll(items: Iterable<T>): void {
for (const item of items) {
this.push(item);
}
}
async pop(): Promise<T> {
if (this.items.length > 0) {
return this.items.shift()!;
}
return new Promise<T>((resolve) => {
this.resolvers.push(resolve);
});
}
get length(): number {
return this.items.length - this.resolvers.length;
}
}

View File

@ -0,0 +1,104 @@
import { describe, it, expect } from 'vitest';
import { AsyncQueue } from '../../src/utils/async-queue';
describe('AsyncQueue', () => {
describe('push', () => {
it('should add item to queue', () => {
const queue = new AsyncQueue<number>();
queue.push(1);
expect(queue.length).toBe(1);
});
});
describe('pop', () => {
it('should return item immediately if queue is not empty', async () => {
const queue = new AsyncQueue<number>();
queue.push(1);
queue.push(2);
const result = await queue.pop();
expect(result).toBe(1);
expect(queue.length).toBe(1);
});
it('should wait for item if queue is empty', async () => {
const queue = new AsyncQueue<number>();
const popPromise = queue.pop();
queue.push(42);
const result = await popPromise;
expect(result).toBe(42);
expect(queue.length).toBe(0);
});
it('should return items in FIFO order', async () => {
const queue = new AsyncQueue<number>();
queue.push(1);
queue.push(2);
queue.push(3);
expect(await queue.pop()).toBe(1);
expect(await queue.pop()).toBe(2);
expect(await queue.pop()).toBe(3);
});
});
describe('pushAll', () => {
it('should add all items to queue', async () => {
const queue = new AsyncQueue<number>();
queue.pushAll([1, 2, 3]);
expect(queue.length).toBe(3);
expect(await queue.pop()).toBe(1);
expect(await queue.pop()).toBe(2);
expect(await queue.pop()).toBe(3);
});
it('should accept any iterable', async () => {
const queue = new AsyncQueue<number>();
queue.pushAll(new Set([10, 20, 30]));
expect(await queue.pop()).toBe(10);
expect(await queue.pop()).toBe(20);
expect(await queue.pop()).toBe(30);
});
it('should resolve waiting pops', async () => {
const queue = new AsyncQueue<number>();
const pop1 = queue.pop();
const pop2 = queue.pop();
queue.pushAll([1, 2]);
expect(await pop1).toBe(1);
expect(await pop2).toBe(2);
expect(queue.length).toBe(0);
});
});
describe('length', () => {
it('should return 0 for empty queue', () => {
const queue = new AsyncQueue<number>();
expect(queue.length).toBe(0);
});
it('should reflect pending consumers as negative', () => {
const queue = new AsyncQueue<number>();
queue.pop(); // no await, so it's pending
expect(queue.length).toBe(-1);
});
it('should update after push and pop', async () => {
const queue = new AsyncQueue<number>();
queue.push(1);
queue.push(2);
expect(queue.length).toBe(2);
await queue.pop();
expect(queue.length).toBe(1);
});
});
});