1 <?php
2 /**
3 * Copyright 2012-2014 Rackspace US, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 namespace OpenCloud\Queues;
19
20 use Guzzle\Common\Event;
21 use Guzzle\Http\Exception\BadResponseException;
22 use OpenCloud\Common\Exceptions\InvalidArgumentError;
23 use OpenCloud\Common\Service\CatalogService;
24 use Symfony\Component\EventDispatcher\EventSubscriberInterface;
25
26 /**
27 * Cloud Queues is an open source, scalable, and highly available message and
28 * notifications service. Users of this service can create and manage a
29 * "producer-consumer" or a "publisher-subscriber" model from one simple API. It
30 * is made up of a few basic components: queues, messages, claims, and stats.
31 *
32 * In the producer-consumer model, users create queues where producers
33 * can post messages. Workers, or consumers, may then claim those messages and
34 * delete them once complete. A single claim may contain multiple messages, and
35 * administrators are given the ability to query claims for status.
36 *
37 * In the publisher-subscriber model, messages are posted to a queue like above,
38 * but messages are never claimed. Instead, subscribers, or watchers, simply
39 * send GET requests to pull all messages since their last request. In this
40 * model, a message will remain in the queue, unclaimed, until the message's
41 * time to live (TTL) has expired.
42 *
43 * Here is an overview of the Cloud Queues workflow:
44 *
45 * 1. You create a queue to which producers post messages.
46 *
47 * 2. Producers post messages to the queue.
48 *
49 * 3. Workers claim messages from the queue, complete the work in that message,
50 * and delete the message.
51 *
52 * 4. If a worker plans to be offline before its message completes, the worker
53 * can retire the claim TTL, putting the message back into the queue for
54 * another worker to claim.
55 *
56 * 5. Subscribers monitor the claims of these queues to keep track of activity
57 * and help troubleshoot if things go wrong.
58 *
59 * For the majority of use cases, Cloud Queues itself will not be responsible
60 * for the ordering of messages. If there is only a single producer, however,
61 * Cloud Queueing guarantees that messages are handled in a First In, First Out
62 * (FIFO) order.
63 */
64 class Service extends CatalogService implements EventSubscriberInterface
65 {
66 const DEFAULT_TYPE = 'rax:queues';
67 const DEFAULT_NAME = 'cloudQueues';
68
69 public static function getSubscribedEvents()
70 {
71 return array(
72 'request.before_send' => 'appendClientIdToRequest'
73 );
74 }
75
76 /**
77 * Set the Client-ID header to all requests for this service.
78 *
79 * @param Event $event
80 */
81 public function appendClientIdToRequest(Event $event)
82 {
83 $event['request']->setHeader('Client-ID', $this->getClientId());
84 }
85
86 /**
87 * An arbitrary string used to differentiate your worker/subscriber. This is
88 * needed, for example, when you return back a list of messages and want to
89 * know the ones your worker is processing.
90 *
91 * @var string
92 */
93 private $clientId;
94
95 /**
96 * @param null $clientId
97 * @return $this
98 */
99 public function setClientId($clientId = null)
100 {
101 if (!$clientId) {
102 $clientId = self::generateUuid();
103 }
104 $this->clientId = $clientId;
105
106 return $this;
107 }
108
109 /**
110 * @return string
111 */
112 public function getClientId()
113 {
114 return $this->clientId;
115 }
116
117 /**
118 * Create a new Queue.
119 *
120 * @param $name Name of the new queue
121 * @return Queue
122 */
123 public function createQueue($name)
124 {
125 if (!is_string($name)) {
126 throw new InvalidArgumentError(
127 'The only parameter required to create a Queue is a string name. Metadata can be set with '
128 . 'Queue::setMetadata and Queue::saveMetadata'
129 );
130 }
131
132 $queue = $this->getQueue();
133 $queue->setName($name);
134
135 // send the request
136 $this->getClient()->put($queue->getUrl())->send();
137
138 return $queue;
139 }
140
141 /**
142 * This operation lists queues for the project, sorting the queues
143 * alphabetically by name.
144 *
145 * @param array $params An associative array of optional parameters:
146 *
147 * - marker (string) Specifies the name of the last queue received in a
148 * previous request, or none to get the first page of
149 * results. Optional.
150 *
151 * - limit (integer) Specifies up to 20 (the default, but configurable)
152 * queues to return. Optional.
153 *
154 * - detailed (bool) Determines whether queue metadata is included in the
155 * response. Optional.
156 *
157 * @return Collection
158 */
159 public function listQueues(array $params = array())
160 {
161 return $this->resourceList('Queue', $this->getUrl('queues', $params));
162 }
163
164 /**
165 * Return an empty Queue.md object.
166 *
167 * @return Queue
168 */
169 public function getQueue($id = null)
170 {
171 return $this->resource('Queue', $id);
172 }
173
174 /**
175 * This operation checks to see if the specified queue exists.
176 *
177 * @param string $name The queue name that you want to check
178 * @return bool
179 */
180 public function hasQueue($name)
181 {
182 if (!$name || !is_string($name)) {
183 throw new InvalidArgumentError(sprintf(
184 'You must provide a queue name as a valid string. You provided: %s',
185 print_r($name, true)
186 ));
187 }
188
189 try {
190 $url = $this->getUrl();
191 $url->addPath('queues')->addPath($name);
192
193 $this->getClient()->head($url)->send();
194
195 return true;
196 } catch (BadResponseException $e) {
197 return false;
198 }
199 }
200 }
201