Skip to content

Commit

Permalink
Fixed default publisher in failed_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
munir131 committed Sep 13, 2019
1 parent ae6fcb4 commit 196a3c6
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 43 deletions.
4 changes: 2 additions & 2 deletions src/Jobs/PubSubJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class PubSubJob extends Job implements JobContract
* @param string $connectionName
* @param string $queue
*/
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $queue, $subscriber = null)
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $subscriberName, $subscriber = null)
{
$this->pubsub = $pubsub;
$this->job = $job;
$this->queue = $queue;
$this->queue = $subscriberName;
$this->container = $container;
$this->connectionName = $connectionName;
$this->subscriber = $subscriber;
Expand Down
37 changes: 25 additions & 12 deletions src/PubSubQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ public function push($job, $data = '', $subscriber = null)
*
* @return array
*/
public function pushRaw($payload, $subscriber = null, array $options = [])
public function pushRaw($payload, $topic = null, array $options = [])
{
$topic = $this->getTopic($subscriber, true);
$topic = $this->getTopic($topic);

$publish = ['data' => $payload];

Expand Down Expand Up @@ -158,8 +158,7 @@ public function later($delay, $job, $data = '', $subscriber = null)
public function pop($subscriber = null)
{
$this->subscriber = $subscriber;
$topic = $this->getTopic($this->getQueue($subscriber));

$topic = $this->getTopicFromSubscriber($subscriber);
$subscription = $topic->subscription($subscriber);
$messages = $subscription->pull([
'returnImmediately' => true,
Expand Down Expand Up @@ -220,12 +219,13 @@ public function acknowledge(Message $message, $queue = null)
*
* @return mixed
*/
public function acknowledgeAndPublish(Message $message, $queue = null, $options = [], $delay = 0)
public function acknowledgeAndPublish(Message $message, $topic = null, $options = [], $delay = 0)
{
if (isset($options['attempts'])) {
$options['attempts'] = (string) $options['attempts'];
}
$topic = $this->getTopic($this->getQueue($queue));
$topic = $this->getTopic($topic);

$subscription = $topic->subscription($this->subscriber);

$subscription->acknowledge($message);
Expand Down Expand Up @@ -276,13 +276,26 @@ protected function createPayloadArray($job, $data = '')
* Get the current topic.
*
* @param string $queue
* @param string $create
*
* @return \Google\Cloud\PubSub\Topic
*/
public function getTopic($queue, $create = false)
public function getTopic($topic)
{
$topic = $this->pubsub->topic($topic);

return $topic;
}

/**
* Get the current topic from subscriber.
*
* @param string $queue
*
* @return \Google\Cloud\PubSub\Topic
*/
public function getTopicFromSubscriber($subscriber)
{
$queue = $this->getQueue($queue);
$queue = $this->getQueue($subscriber);
$topic = $this->pubsub->topic($queue);

return $topic;
Expand Down Expand Up @@ -330,10 +343,10 @@ public function getPubSub()
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
public function getQueue($subscriberName)
{
if ($this->config && $this->config['subscribers'] && $queue && isset($this->config['subscribers'][$queue])) {
return $this->config['subscribers'][$queue];
if ($this->config && $this->config['subscribers'] && $subscriberName && isset($this->config['subscribers'][$subscriberName])) {
return $this->config['subscribers'][$subscriberName];
}
return $this->default;
}
Expand Down
8 changes: 7 additions & 1 deletion tests/Unit/Connectors/PubSubConnectorTests.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ public function testConnectReturnsPubSubQueueInstance()
private function createFakeConfig()
{
return [
'queue' => 'test',
'queue' => 'test_pub',
'project_id' => 'the-project-id',
'retries' => 1,
'request_timeout' => 60,
'subscribers' => [
'test_sub' => 'test_pub',
'sub1' => 'topic1',
'sub2' => 'topic2',
'sub3' => 'topic1',
]
];
}
}
55 changes: 27 additions & 28 deletions tests/Unit/PubSubQueueTests.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public function setUp()
$this->subscription = $this->createMock(Subscription::class);
$this->message = $this->createMock(Message::class);
$this->config = [
'queue' => 'test',
'queue' => 'test_pub',
'project_id' => 'the-project-id',
'retries' => 1,
'request_timeout' => 60,
'subscribers' => [
'test_sub' => 'test_pub',
'sub1' => 'topic1',
'sub2' => 'topic2',
'sub3' => 'topic1',
Expand All @@ -45,6 +46,7 @@ public function setUp()
->setMethods([
'pushRaw',
'getTopic',
'getTopicFromSubscriber',
'exists',
'subscription',
'availableAt',
Expand Down Expand Up @@ -101,6 +103,7 @@ public function testLater()
$job = 'test';
$delay = 60;
$delay_timestamp = Carbon::now()->addSeconds($delay)->getTimestamp();
$delay_timestamp_str = (string) $delay_timestamp;

$this->queue->method('availableAt')
->willReturn($delay_timestamp);
Expand All @@ -111,8 +114,8 @@ public function testLater()
->with(
$this->isType('string'),
$this->anything(),
$this->callback(function ($options) use ($delay_timestamp) {
if (! isset($options['available_at']) || $options['available_at'] !== $delay_timestamp) {
$this->callback(function ($options) use ($delay_timestamp_str) {
if (! isset($options['available_at']) || $options['available_at'] !== $delay_timestamp_str) {
return false;
}

Expand All @@ -137,9 +140,12 @@ public function testPopWhenJobsAvailable()
$this->queue->method('getTopic')
->willReturn($this->topic);

$this->queue->method('getTopicFromSubscriber')
->willReturn($this->topic);

$this->queue->setContainer($this->createMock(Container::class));

$this->assertTrue($this->queue->pop('test') instanceof PubSubJob);
$this->assertTrue($this->queue->pop('test_sub') instanceof PubSubJob);
}

public function testPopWhenNoJobAvailable()
Expand All @@ -156,7 +162,10 @@ public function testPopWhenNoJobAvailable()
$this->queue->method('getTopic')
->willReturn($this->topic);

$this->assertTrue(is_null($this->queue->pop('test')));
$this->queue->method('getTopicFromSubscriber')
->willReturn($this->topic);

$this->assertTrue(is_null($this->queue->pop('test_sub')));
}

public function testPopWhenTopicDoesNotExist()
Expand All @@ -167,7 +176,13 @@ public function testPopWhenTopicDoesNotExist()
$this->topic->method('exists')
->willReturn(false);

$this->assertTrue(is_null($this->queue->pop('test')));
$this->queue->method('getTopicFromSubscriber')
->willReturn($this->topic);

$this->topic->method('subscription')
->willReturn($this->subscription);

$this->assertTrue(is_null($this->queue->pop('test_sub')));
}

public function testBulk()
Expand All @@ -179,6 +194,9 @@ public function testBulk()
$this->queue->method('getTopic')
->willReturn($this->topic);

$this->queue->method('getTopicFromSubscriber')
->willReturn($this->topic);

$this->queue->method('subscribeToTopic')
->willReturn($this->subscription);

Expand All @@ -204,6 +222,7 @@ public function testAcknowledgeAndPublish()
$options = ['foo' => 'bar'];
$delay = 60;
$delay_timestamp = Carbon::now()->addSeconds($delay)->getTimestamp();
$delay_timestamp_str = (string) $delay_timestamp;

$this->subscription->expects($this->once())
->method('acknowledge');
Expand All @@ -221,12 +240,12 @@ public function testAcknowledgeAndPublish()
->method('publish')
->willReturn($this->result)
->with(
$this->callback(function ($message) use ($options, $delay_timestamp) {
$this->callback(function ($message) use ($options, $delay_timestamp_str) {
if (! isset($message['attributes'])) {
return false;
}

if (! isset($message['attributes']['available_at']) || $message['attributes']['available_at'] !== $delay_timestamp) {
if (! isset($message['attributes']['available_at']) || $message['attributes']['available_at'] !== $delay_timestamp_str) {
return false;
}

Expand Down Expand Up @@ -257,26 +276,6 @@ public function testGetTopic()
$this->assertTrue($queue->getTopic('test') instanceof Topic);
}

public function testCreateTopicAndReturnIt()
{
$this->topic->method('exists')
->willReturn(false);

$this->topic->expects($this->once())
->method('create')
->willReturn(true);

$this->client->method('topic')
->willReturn($this->topic);

$queue = $this->getMockBuilder(PubSubQueue::class)
->setConstructorArgs([$this->client, 'default', $this->config])
->setMethods()
->getMock();

$this->assertTrue($queue->getTopic('test', true) instanceof Topic);
}

public function testSubscribtionIsCreated()
{
$this->topic->method('subscription')
Expand Down

0 comments on commit 196a3c6

Please sign in to comment.