1 <?php
2 /**
3 * Copyright 2012-2014 Rackspace US, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 namespace OpenCloud\Queues\Resource;
19
20 use Guzzle\Http\Url;
21 use OpenCloud\Common\Collection\PaginatedIterator;
22 use OpenCloud\Common\Http\Message\Formatter;
23 use OpenCloud\Common\Resource\PersistentResource;
24 use OpenCloud\Queues\Exception;
25 use OpenCloud\Queues\Collection\MessageIterator;
26 use OpenCloud\Common\Metadata;
27
28 /**
29 * A queue holds messages. Ideally, a queue is created per work type. For example,
30 * if you want to compress files, you would create a queue dedicated to this job.
31 * Any application that reads from this queue would only compress files.
32 */
33 class Queue extends PersistentResource
34 {
35 /**
36 * Maximum number of messages that can be posted at once.
37 */
38 const MAX_POST_MESSAGES = 10;
39
40 /**
41 * The name given to the queue. The name MUST NOT exceed 64 bytes in length,
42 * and is limited to US-ASCII letters, digits, underscores, and hyphens.
43 *
44 * @var string
45 */
46 private $name;
47
48 /**
49 * Miscellaneous, user-defined information about the queue.
50 *
51 * @var array|Metadata
52 */
53 protected $metadata;
54
55 /**
56 * Populated when the service's listQueues() method is called. Provides a
57 * convenient link for a particular Queue.md.
58 *
59 * @var string
60 */
61 private $href;
62
63 protected static $url_resource = 'queues';
64 protected static $json_collection_name = 'queues';
65 protected static $json_name = false;
66
67 public $createKeys = array('name');
68
69 /**
70 * Set the name (with validation).
71 *
72 * @param $name string
73 * @return $this
74 * @throws \OpenCloud\Queues\Exception\QueueException
75 */
76 public function setName($name)
77 {
78 if (preg_match('#[^\w\d\-\_]+#', $name)) {
79 throw new Exception\QueueException(sprintf(
80 'Queues names are restricted to alphanumeric characters, '
81 . ' hyphens and underscores. You provided: %s',
82 print_r($name, true)
83 ));
84 }
85
86 $this->name = $name;
87
88 return $this;
89 }
90
91 /**
92 * @return string
93 */
94 public function getName()
95 {
96 return $this->name;
97 }
98
99 /**
100 * Save this metadata both to the local object and the API.
101 *
102 * @param array $params
103 * @return mixed
104 */
105 public function saveMetadata(array $params = array())
106 {
107 if (!empty($params)) {
108 $this->setMetadata($params);
109 }
110
111 $json = json_encode((object) $this->getMetadata()->toArray());
112
113 return $this->getClient()->put($this->getUrl('metadata'), self::getJsonHeader(), $json)->send();
114 }
115
116 /**
117 * Retrieve metadata from the API and set it to the local object.
118 */
119 public function retrieveMetadata()
120 {
121 $response = $this->getClient()->get($this->url('metadata'))->send();
122
123 $metadata = new Metadata();
124 $metadata->setArray(Formatter::decode($response));
125 $this->setMetadata($metadata);
126 }
127
128 public function create($params = array())
129 {
130 return $this->getService()->createQueue($params);
131 }
132
133 public function createJson()
134 {
135 return (object) array(
136 'queue_name' => $this->getName(),
137 'metadata' => $this->getMetadata(false)
138 );
139 }
140
141 public function primaryKeyField()
142 {
143 return 'name';
144 }
145
146 public function update($params = array())
147 {
148 return $this->noUpdate();
149 }
150
151 /**
152 * This operation returns queue statistics including how many messages are
153 * in the queue, broken out by status.
154 *
155 * @return object
156 */
157 public function getStats()
158 {
159 $response = $this->getClient()
160 ->get($this->getUrl('stats'))
161 ->send();
162
163 $body = Formatter::decode($response);
164
165 return (!isset($body->messages)) ? false : $body->messages;
166 }
167
168 /**
169 * Gets a message either by a specific ID, or, if no ID is specified, just
170 * an empty Message object.
171 *
172 * @param string|null $id If a string, then the service will retrieve an
173 * individual message based on its specific ID. If NULL, then an empty
174 * object is returned for further use.
175 * @return Message
176 */
177 public function getMessage($id = null)
178 {
179 return $this->getService()->resource('Message', $id, $this);
180 }
181
182 /**
183 * Post an individual message.
184 *
185 * @param array $params
186 * @return bool
187 */
188 public function createMessage(array $params)
189 {
190 return $this->createMessages(array($params));
191 }
192
193 /**
194 * Post multiple messages.
195 *
196 * @param array $messages
197 * @return bool
198 */
199 public function createMessages(array $messages)
200 {
201 $objects = array();
202
203 foreach ($messages as $dataArray) {
204 $objects[] = $this->getMessage($dataArray)->createJson();
205 }
206
207 $json = json_encode(array_slice($objects, 0, self::MAX_POST_MESSAGES));
208 $this->checkJsonError();
209
210 $response = $this->getClient()
211 ->post($this->getUrl('messages'), self::getJsonHeader(), $json)
212 ->send();
213
214 if (null !== ($location = $response->getHeader('Location'))) {
215 $parts = array_merge($this->getUrl()->getParts(), parse_url($location));
216 $url = Url::factory(Url::buildUrl($parts));
217
218 $options = $this->makeResourceIteratorOptions(__NAMESPACE__ . '\\Message') + array(
219 'baseUrl' => $url,
220 'limit.page' => 10
221 );
222
223 return PaginatedIterator::factory($this, $options);
224 }
225
226 return true;
227 }
228
229 /**
230 * Lists messages according to certain filter options. Results are ordered
231 * by age, oldest message first. All of the parameters are optional.
232 *
233 * @param array $options An associative array of filtering parameters:
234 *
235 * - ids (array) A two-dimensional array of IDs to retrieve.
236 *
237 * - claim_id (string) The claim ID to which the message is associated.
238 *
239 * - marker (string) An opaque string that the client can use to request the
240 * next batch of messages. If not specified, the API will return all
241 * messages at the head of the queue (up to limit).
242 *
243 * - limit (integer) A number up to 20 (the default, but is configurable)
244 * queues to return. If not specified, it defaults to 10.
245 *
246 * - echo (bool) Determines whether the API returns a client's own messages,
247 * as determined by the uuid portion of the User-Agent header. If not
248 * specified, echo defaults to FALSE.
249 *
250 * - include_claimed (bool) Determines whether the API returns claimed
251 * messages as well as unclaimed messages. If not specified, defaults
252 * to FALSE (i.e. only unclaimed messages are returned).
253 *
254 * @return \OpenCloud\Queues\Collection\MessageIterator
255 */
256 public function listMessages(array $options = array())
257 {
258 // Implode array into delimeted string if necessary
259 if (isset($options['ids']) && is_array($options['ids'])) {
260 $options['ids'] = implode(',', $options['ids']);
261 }
262
263 // PHP will cast boolean values to integers (true => 1, false => 0) but
264 // the Queues REST API expects strings as query parameters ("true" and "false").
265 foreach ($options as $optionKey => $optionValue) {
266 if (true === $optionValue) {
267 $options[$optionKey] = "true";
268 } elseif (false === $optionValue) {
269 $options[$optionKey] = "false";
270 }
271 }
272
273 $url = $this->getUrl('messages', $options);
274
275 $options = $this->makeResourceIteratorOptions(__NAMESPACE__ . '\\Message') + array(
276 'baseUrl' => $url,
277 'limit.page' => 10
278 );
279
280 return MessageIterator::factory($this, $options);
281 }
282
283 /**
284 * This operation immediately deletes the specified messages, providing a
285 * means for bulk deletes.
286 *
287 * @param array $ids Two-dimensional array of IDs to be deleted
288 * @return boolean
289 */
290 public function deleteMessages(array $ids)
291 {
292 $url = $this->url('messages', array('ids' => implode(',', $ids)));
293 $this->getClient()->delete($url)->send();
294
295 return true;
296 }
297
298 /**
299 * This operation claims a set of messages, up to limit, from oldest to
300 * newest, skipping any that are already claimed. If no unclaimed messages
301 * are available, FALSE is returned.
302 *
303 * You should delete the message when you have finished processing it,
304 * before the claim expires, to ensure the message is processed only once.
305 * Be aware that you must call the delete() method on the Message object and
306 * pass in the Claim ID, rather than relying on the service's bulk delete
307 * deleteMessages() method. This is so that the server can return an error
308 * if the claim just expired, giving you a chance to roll back your processing
309 * of the given message, since another worker will likely claim the message
310 * and process it.
311 *
312 * Just as with a message's age, the age given for the claim is relative to
313 * the server's clock, and is useful for determining how quickly messages are
314 * getting processed, and whether a given message's claim is about to expire.
315 *
316 * When a claim expires, it is removed, allowing another client worker to
317 * claim the message in the case that the original worker fails to process it.
318 *
319 * @param int $limit
320 */
321 public function claimMessages(array $options = array())
322 {
323 $limit = (isset($options['limit'])) ? $options['limit'] : Claim::LIMIT_DEFAULT;
324 $grace = (isset($options['grace'])) ? $options['grace'] : Claim::GRACE_DEFAULT;
325 $ttl = (isset($options['ttl'])) ? $options['ttl'] : Claim::TTL_DEFAULT;
326
327 $json = json_encode((object) array(
328 'grace' => $grace,
329 'ttl' => $ttl
330 ));
331
332 $url = $this->getUrl('claims', array('limit' => $limit));
333
334 $response = $this->getClient()->post($url, self::getJsonHeader(), $json)->send();
335
336 if ($response->getStatusCode() == 204) {
337 return false;
338 }
339
340 $options = array('resourceClass' => 'Message', 'baseUrl' => $url);
341
342 return PaginatedIterator::factory($this, $options, Formatter::decode($response));
343 }
344
345 /**
346 * If an ID is supplied, the API is queried for a persistent object; otherwise
347 * an empty object is returned.
348 *
349 * @param int $id
350 * @return Claim
351 */
352 public function getClaim($id = null)
353 {
354 return $this->getService()->resource('Claim', $id, $this);
355 }
356 }
357