diff --git a/lib/ProcessingPriorityQueue.ts b/lib/ProcessingPriorityQueue.ts index e6b3759..c76942c 100644 --- a/lib/ProcessingPriorityQueue.ts +++ b/lib/ProcessingPriorityQueue.ts @@ -174,7 +174,7 @@ export default class ProcessingPriorityQueue { this.process(); } - if (this.currentConcurrencyCount <= this.concurrencyLimit && this.priorityQueue.size > 0) { + if (this.currentConcurrencyCount < this.concurrencyLimit && this.priorityQueue.size > 0) { this.currentConcurrencyCount++; const {value: sqItem, valid} = this.poll(); diff --git a/tests/unit/PTask.test.ts b/tests/unit/PTask.test.ts index 3b16ef4..43df319 100644 --- a/tests/unit/PTask.test.ts +++ b/tests/unit/PTask.test.ts @@ -685,4 +685,38 @@ describe("PriorityTask", () => { }); expect(PTask.getAllPTasks("krombopulos").length).toBe(1); }); + + it("should not schedule more than concurrencyLimit items", (done) => { + const CONCURRENCY_LIMIT = 2; + PTask.setConcurrencyLimit(CONCURRENCY_LIMIT); + + const delayedOnRun = async (a: number) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + return a; + }; + + // Prepare tasks + const ptasks = Array.from({ length: CONCURRENCY_LIMIT + 1 }).map((_, i) => { + return new PTask({ + priority: i, + args: i, + onRun: delayedOnRun, + }); + }); + + // run all tasks + const promises = ptasks.map((ptask) => ptask.run()); + + setTimeout(() => { + const runningTasks = PTask.getAllPTasks().filter((ptask) => ptask.status === "running"); + const pendingTasks = PTask.getAllPTasks().filter((ptask) => ptask.status === "pending"); + + expect(runningTasks.length).toBe(CONCURRENCY_LIMIT); + expect(pendingTasks.length).toBe(1); + + Promise.all(promises).then(() => { + done() + }); + }, 10); + }) });