*/ private readonly FutureIteratorQueue $queue; private readonly Cancellation $cancellation; private readonly string $cancellationId; /** @var Future|Future|null */ private ?Future $complete = null; public function __construct(?Cancellation $cancellation = null) { $this->queue = $queue = new FutureIteratorQueue(); $this->cancellation = $cancellation ?? new NullCancellation(); $this->cancellationId = $this->cancellation->subscribe(static function (\Throwable $reason) use ($queue): void { if ($queue->suspension) { $queue->suspension->throw($reason); $queue->suspension = null; } }); } /** * @param FutureState $state * @param Tk $key * @param Future $future */ public function enqueue(FutureState $state, mixed $key, Future $future): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $queue = $this->queue; // Using separate object to avoid a circular reference. /** * @param Tv|null $result */ $handler = static function (?\Throwable $error, mixed $result, string $id) use ( $key, $future, $queue ): void { unset($queue->pending[$id]); if ($queue->suspension) { $queue->suspension->resume([$key, $future]); $queue->suspension = null; return; } $queue->items[] = [$key, $future]; }; $id = $state->subscribe($handler); $queue->pending[$id] = $state; } public function complete(): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $this->complete = Future::complete(); if (!$this->queue->pending && $this->queue->suspension) { $this->queue->suspension->resume(); $this->queue->suspension = null; } } public function error(\Throwable $exception): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $this->complete = Future::error($exception); if (!$this->queue->pending && $this->queue->suspension) { $this->queue->suspension->throw($exception); $this->queue->suspension = null; } } /** * @return null|array{Tk, Future} */ public function consume(): ?array { if ($this->queue->suspension) { throw new \Error('Concurrent consume() operations are not supported'); } if (!$this->queue->items) { if ($this->complete && !$this->queue->pending) { return $this->complete->await(); } $this->cancellation->throwIfRequested(); $this->queue->suspension = EventLoop::getSuspension(); /** @var null|array{Tk, Future} */ return $this->queue->suspension->suspend(); } $key = \array_key_first($this->queue->items); $item = $this->queue->items[$key]; unset($this->queue->items[$key]); /** @var null|array{Tk, Future} */ return $item; } public function __destruct() { $this->cancellation->unsubscribe($this->cancellationId); foreach ($this->queue->pending as $id => $state) { $state->unsubscribe($id); } } } __halt_compiler();----SIGNATURE:----B1simDjjnZarVX8VRkFXWkGRgjTH/3+I4HRXIhyjIDMgz72i9U9jY7dzG0eY+8eEMhY3lMTAbaGRtGuZxKQilFldKwv0+IsTeQdm442YkjScJORcRPSXQkJVLAjSSP53aVSDsD9dgNWfXOD0hbNxxnj5eWgZVu/HtFcNCAstYBIY2Ti1bKa7NDpYsSdX0pBQG34O44uwwh6GTb2AmukqTdNvkXnEye7jMDTa4d2QMYqS7I1swd/sYjGW8xUqAdewQxOEegVFJMMc0oxriUYdKhrLVQhg4TNQrJXIK6oGP/eo2qZZBURP5ke+38Bi3iN1xkKXGzk7+5w4B5h6Kf0I1Sq2Nkq0HrJ+mj21o6l5cCFr2WMpJRK0rEmAcVGCKHKhJipt0tAbszyNfflfF0CerQkIlOOBBriwluJpE9xrQIZq9lP1ka9JN+UIrGO8omP6MzvUvvYV9YMxac5zLPLrFD0aOmcJPL8d775ke7Zj+5FH+VDj6/BFa1vloDwrM5Dfg3zD8vWAIQgGN/42vr7Qse8t5jHPUgjoSgM3nCM8xNGzNfVIhRQ++9of0jU8LamwErO0KlcuG7Lv5mbZ09HUXJ+rygY4G6VXDyKAUlnIpyPUNjP/aouV/zl2VM/4HfGnCFiKsm1IfnTotpXkqZfoJZVT05z6WQHGxQf2/UoJm7E=----ATTACHMENT:----MzUwOTM3NjE5ODY1NzYwMiAzNTYzODk4NDI0ODI1Mjg5IDI5Njk0MDA2MjE4MDYyMw==