![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/cartforge.co/app/code/Webkul/PrivateShop/Model/Queue/ |
<?php /** * Webkul Software * * @category Webkul * @package Webkul_PrivateShop * @author Webkul Software Private Limited * @copyright Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace Webkul\PrivateShop\Model\Queue; use Webkul\PrivateShop\Api\Data\EnvelopeInterface; use Webkul\PrivateShop\Api\Data\EnvelopeInterfaceFactory; use Webkul\PrivateShop\Api\Data\MessageInterface; use Webkul\PrivateShop\Api\Data\MessageInterfaceFactory; use Webkul\PrivateShop\Api\Queue\MessageRepositoryInterface; class MysqlBroker implements \Webkul\PrivateShop\Api\Queue\BrokerInterface { /** * @var MessageInterfaceFactory */ protected $queueMessageFactory; /** * @var EnvelopeInterfaceFactory */ protected $messageEnvelopeFactory; /** * @var MessageRepositoryInterface */ protected $queueMessageRepository; /** * @var string */ protected $queueName; /** * @param MessageInterfaceFactory $queueMessageFactory * @param EnvelopeInterfaceFactory $messageEnvelopeFactory * @param MessageRepositoryInterface $queueMessageRepository * @param queueName $queueName */ public function __construct( MessageInterfaceFactory $queueMessageFactory, EnvelopeInterfaceFactory $messageEnvelopeFactory, MessageRepositoryInterface $queueMessageRepository, $queueName = null ) { $this->queueMessageFactory = $queueMessageFactory; $this->messageEnvelopeFactory = $messageEnvelopeFactory; $this->queueMessageRepository = $queueMessageRepository; $this->queueName = $queueName; } /** * @inheritdoc */ public function enqueue(EnvelopeInterface $messageEnvelope) { $queueMessage = $this->queueMessageFactory->create() ->setQueueName($this->queueName) ->setMessageContent($messageEnvelope->getContent()); return $this->queueMessageRepository->create($queueMessage); } /** * @inheritdoc */ public function next() { $queueMessage = $this->queueMessageRepository->next(); if (!$queueMessage || !$queueMessage->getId()) { return false; } return $this->messageEnvelopeFactory->create() ->setBrokerRef($queueMessage->getId()) ->setContent($queueMessage->getMessageContent()); } /** * @inheritdoc */ public function acknowledge(EnvelopeInterface $message) { $message = $this->queueMessageRepository->get($message->getBrokerRef()); $this->queueMessageRepository->remove($message); } /** * @inheritdoc */ public function reject(EnvelopeInterface $message, $requeue = true) { $message = $this->queueMessageRepository->get($message->getBrokerRef()); if ($requeue) { $this->queueMessageRepository->requeue($message); } else { $this->queueMessageRepository->remove($message); } } }