diff --git a/src/nsqphp/Exception/ExpiredMessageException.php b/src/nsqphp/Exception/ExpiredMessageException.php new file mode 100644 index 0000000..e4debc7 --- /dev/null +++ b/src/nsqphp/Exception/ExpiredMessageException.php @@ -0,0 +1,7 @@ +delay = (int) $delay; + } + + public function getDelay() + { + return $this->delay; + } +} diff --git a/src/nsqphp/nsqphp.php b/src/nsqphp/nsqphp.php index 59043a4..2cb0e53 100644 --- a/src/nsqphp/nsqphp.php +++ b/src/nsqphp/nsqphp.php @@ -427,6 +427,16 @@ public function readAndDispatchMessage($socket, $topic, $channel, $callback) } else { try { call_user_func($callback, $msg); + } catch (Exception\ExpiredMessageException $e) { + // expired message + if ($this->logger) { + $this->logger->info(sprintf( + 'Expired message [%s] "%s": %s', + (string)$connection, + $msg->getId(), + $e->getMessage() + )); + } } catch (\Exception $e) { // erase knowledge of this msg from dedupe if ($this->dedupe !== NULL) { @@ -436,20 +446,29 @@ public function readAndDispatchMessage($socket, $topic, $channel, $callback) if ($this->logger) { $this->logger->warn(sprintf('Error processing [%s] "%s": %s', (string)$connection, $msg->getId(), $e->getMessage())); } + + $requeue = false; + // explicit requeuing + if ($e instanceof Exception\RequeueMessageException) { + $requeue = true; + $requeueDelay = $e->getDelay(); + } // requeue message according to backoff strategy; continue - if ($this->requeueStrategy !== NULL - && ($delay = $this->requeueStrategy->shouldRequeue($msg)) !== NULL) { + else if ($this->requeueStrategy !== NULL + && ($requeueDelay = $this->requeueStrategy->shouldRequeue($msg)) !== NULL) { + $requeue = true; + } + if ($requeue) { // requeue if ($this->logger) { - $this->logger->debug(sprintf('Requeuing [%s] "%s" with delay "%s"', (string)$connection, $msg->getId(), $delay)); + $this->logger->debug(sprintf('Requeuing [%s] "%s" with delay "%s"', (string)$connection, $msg->getId(), $requeueDelay)); } - $connection->write($this->writer->requeue($msg->getId(), $delay)); + $connection->write($this->writer->requeue($msg->getId(), $requeueDelay)); $connection->write($this->writer->ready(1)); return; - } else { - if ($this->logger) { - $this->logger->debug(sprintf('Not requeuing [%s] "%s"', (string)$connection, $msg->getId())); - } + } + if ($this->logger) { + $this->logger->debug(sprintf('Not requeuing [%s] "%s"', (string)$connection, $msg->getId())); } } }