1 <?php
2 /**
3 * Copyright 2012-2014 Rackspace US, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 namespace OpenCloud\ObjectStore\Upload;
19
20 use Guzzle\Http\EntityBody;
21 use Guzzle\Http\ReadLimitEntityBody;
22
23 /**
24 * A transfer type which executes in a concurrent fashion, i.e. with multiple workers uploading at once. Each worker is
25 * charged with uploading a particular chunk of data. The entity body is fragmented into n pieces - calculated by
26 * dividing the total size by the individual part size.
27 *
28 * @codeCoverageIgnore
29 */
30 class ConcurrentTransfer extends AbstractTransfer
31 {
32 public function transfer()
33 {
34 $totalParts = (int) ceil($this->entityBody->getContentLength() / $this->partSize);
35 $workers = min($totalParts, $this->options['concurrency']);
36 $parts = $this->collectParts($workers);
37
38 while ($this->transferState->count() < $totalParts) {
39 $completedParts = $this->transferState->count();
40 $requests = array();
41
42 // Iterate over number of workers until total completed parts is what we need it to be
43 for ($i = 0; $i < $workers && ($completedParts + $i) < $totalParts; $i++) {
44 // Offset is the current pointer multiplied by the standard chunk length
45 $offset = ($completedParts + $i) * $this->partSize;
46 $parts[$i]->setOffset($offset);
47
48 // If this segment is empty (i.e. buffering a half-full chunk), break the iteration
49 if ($parts[$i]->getContentLength() == 0) {
50 break;
51 }
52
53 // Add this to the request queue for later processing
54 $requests[] = TransferPart::createRequest(
55 $parts[$i],
56 $this->transferState->count() + $i + 1,
57 $this->client,
58 $this->options
59 );
60 }
61
62 // Iterate over our queued requests and process them
63 foreach ($this->client->send($requests) as $response) {
64 // Add this part to the TransferState
65 $this->transferState->addPart(TransferPart::fromResponse($response));
66 }
67 }
68 }
69
70 /**
71 * Partitions the entity body into an array - each worker is represented by a key, and the value is a
72 * ReadLimitEntityBody object, whose read limit is fixed based on this object's partSize value. This will always
73 * ensure the chunks are sent correctly.
74 *
75 * @param int The total number of workers
76 * @return array The worker array
77 */
78 private function collectParts($workers)
79 {
80 $uri = $this->entityBody->getUri();
81
82 $array = array(new ReadLimitEntityBody($this->entityBody, $this->partSize));
83
84 for ($i = 1; $i < $workers; $i++) {
85 // Need to create a fresh EntityBody, otherwise you'll get weird 408 responses
86 $array[] = new ReadLimitEntityBody(new EntityBody(fopen($uri, 'r')), $this->partSize);
87 }
88
89 return $array;
90 }
91 }
92