libzypp 17.31.23
rangedownloader_p.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9
14#include <zypp-core/AutoDispose.h>
15#include <zypp-core/fs/PathInfo.h>
16
17#include "rangedownloader_p.h"
18
19namespace zyppng {
20
22 { }
23
24 void RangeDownloaderBaseState::onRequestProgress( NetworkRequest &, off_t , off_t, off_t , off_t )
25 {
26 off_t dlnowMulti = _downloadedMultiByteCount;
27 for( const auto &req : _runningRequests ) {
28 dlnowMulti += req->downloadedByteCount();
29 }
30
31 if ( !assertExpectedFilesize( dlnowMulti ) )
32 return;
33
34 stateMachine()._sigProgress.emit( *stateMachine().z_func(), _fileSize, dlnowMulti );
35 }
36
38 {
39 auto lck = stateMachine().z_func()->shared_from_this();
40 auto it = std::find_if( _runningRequests.begin(), _runningRequests.end(), [ &req ]( const std::shared_ptr<Request> &r ) {
41 return ( r.get() == &req );
42 });
43 if ( it == _runningRequests.end() )
44 return;
45
46 auto reqLocked = *it;
47
48 //remove from running
49 _runningRequests.erase( it );
50
51 //feed the working URL back into the mirrors in case there are still running requests that might fail
52 // @TODO , finishing the transfer might never be called in case of cancelling the request, need a better way to track running transfers
53 if ( reqLocked->_myMirror )
54 reqLocked->_myMirror->finishTransfer( !err.isError() );
55
56 if ( err.isError() ) {
57 return handleRequestError( reqLocked, err );
58 }
59
62 return;
63 }
64
65 MIL << req.nativeHandle() << " " << "Request finished successfully."<<std::endl;
66 const auto &rngs = reqLocked->requestedRanges();
67 std::for_each( rngs.begin(), rngs.end(), [&req]( const auto &b ){ DBG_MEDIA << req.nativeHandle() << " " << "-> Block " << b.start << " finished." << std::endl; } );
68
69 auto restartReqWithBlock = [ this ]( std::shared_ptr<Request> &req, std::vector<Block> &&blocks ) {
70 MIL << req->nativeHandle() << " " << "Reusing Request to download blocks:"<<std::endl;
71 if ( !addBlockRanges( req, std::move( blocks ) ) )
72 return false;
73
74 //this is not a new request, only add to queues but do not connect signals again
75 addNewRequest( req, false );
76 return true;
77 };
78
79 //check if we already have enqueued all blocks if not reuse the request
80 if ( _ranges.size() ) {
81 MIL << req.nativeHandle() << " " << "Reusing to download blocks: "<<std::endl;
82 if ( !restartReqWithBlock( reqLocked, getNextBlocks( reqLocked->url().getScheme() ) ) ) {
83 return setFailed( "Failed to restart request with new blocks." );
84 }
85 return;
86
87 } else {
88 //if we have failed blocks, try to download them with this mirror
89 if ( !_failedRanges.empty() ) {
90
91 auto fblks = getNextFailedBlocks( reqLocked->url().getScheme() );
92 MIL << req.nativeHandle() << " " << "Reusing to download failed blocks: "<<std::endl;
93 if ( !restartReqWithBlock( reqLocked, std::move(fblks) ) ) {
94 return setFailed( "Failed to restart request with previously failed blocks." );
95 }
96 return;
97 }
98 }
99
100 //feed the working URL back into the mirrors in case there are still running requests that might fail
101 _fileMirrors.push_back( reqLocked->_originalUrl );
102
103 // make sure downloads are running, at this point
105 }
106
107 void RangeDownloaderBaseState::handleRequestError( std::shared_ptr<Request> req, const zyppng::NetworkRequestError &err )
108 {
109 bool retry = false;
110 auto &parent = stateMachine();
111
112
113 //Handle the auth errors explicitly, we need to give the user a way to put in new credentials
114 //if we get valid new credentials we can retry the request
116 retry = parent.handleRequestAuthError( req, err );
117 } else {
118
119 //if a error happens during a multi download we try to use another mirror to download the failed block
120 MIL << req->nativeHandle() << " " << "Request failed " << req->extendedErrorString() << "(" << req->url() << ")" << std::endl;
121 if ( req->lastRedirectInfo ().size () )
122 MIL << req->nativeHandle() << " Last redirection target was: " << req->lastRedirectInfo () << std::endl;
123
124 NetworkRequestError dummyErr;
125
126 const auto &fRanges = req->failedRanges();
127 try {
128 std::transform( fRanges.begin(), fRanges.end(), std::back_inserter(_failedRanges), [ &req ]( const auto &r ){
129 Block b = std::any_cast<Block>(r.userData);;
130 b._failedWithErr = req->error();
131 if ( zypp::env::ZYPP_MEDIA_CURL_DEBUG() > 3 )
132 DBG_MEDIA << "Adding failed block to failed blocklist: " << b.start << " " << b.len << " (" << req->error().toString() << " [" << req->error().nativeErrorString()<< "])" << std::endl;
133 return b;
134 });
135
136 // try to fill the open spot right away
138 return;
139
140 } catch ( const zypp::Exception &ex ) {
141 //we just log the exception and fall back to a normal download
142 WAR << "Multipart download failed: " << ex.asString() << std::endl;
143 }
144 }
145
146 //if rety is true we just enqueue the request again, usually this means authentication was updated
147 if ( retry ) {
148 //make sure this request will run asap
149 req->setPriority( parent._defaultSubRequestPriority );
150
151 //this is not a new request, only add to queues but do not connect signals again
152 addNewRequest( req, false );
153 return;
154 }
155
156 //we do not have more mirrors left we can try
157 cancelAll ( err );
158
159 // not all hope is lost, maybe a normal download can work out?
160 // fall back to normal download
161 _sigFailed.emit();
162 }
163
165 {
167 return;
168
169 zypp::OnScopeExit clearFlag( [this]() {
171 });
172
174
175 //check if there is still work to do
176 while ( _ranges.size() || _failedRanges.size() ) {
177
178 // download was already finished
179 if ( _error.isError() )
180 return;
181
182 if ( _runningRequests.size() >= 10 )
183 break;
184
185 // prepareNextMirror will automatically call mirrorReceived() once there is a mirror ready
186 const auto &res = prepareNextMirror();
187 // if mirrors are delayed we stop here, once the mirrors are ready we get called again
189 return;
190 else if ( res == MirrorHandlingStateBase::Failed ) {
192 return;
193 }
194 }
195
196 // check if we are done at this point
197 if ( _runningRequests.empty() ) {
198
199 if ( _failedRanges.size() || _ranges.size() ) {
201 return;
202 }
203
204 // seems we were successfull , transition to finished state
205 setFinished();
206 }
207 }
208
210 {
211
212 auto &parent = stateMachine();
213 Url myUrl;
214 TransferSettings settings;
215
216 auto err = setupMirror( mirror, myUrl, settings );
217 if ( err.isError() ) {
218 WAR << "Failure to setup mirror " << myUrl << " with error " << err.toString() << "("<< err.nativeErrorString() << "), dropping it from the list of mirrors." << std::endl;
219 // if a mirror fails , we remove it from our list
220 _fileMirrors.erase( mirror.first );
221
222 // make sure this is retried
224 return;
225 }
226
227 auto blocks = getNextBlocks( myUrl.getScheme() );
228 if ( !blocks.size() )
229 blocks = getNextFailedBlocks( myUrl.getScheme() );
230
231 if ( !blocks.size() ) {
232 // We have no blocks. In theory, that should never happen, but for safety, we error out here. It is better than
233 // getting stuck.
234 setFailed( NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Mirror requested after all blocks were downloaded." ) );
235 return;
236 }
237
238 const auto &spec = parent._spec;
239
240 std::shared_ptr<Request> req = std::make_shared<Request>( ::internal::clearQueryString( myUrl ), spec.targetPath(), NetworkRequest::WriteShared );
241 req->_myMirror = mirror.second;
242 req->_originalUrl = myUrl;
243 req->setPriority( parent._defaultSubRequestPriority );
244 req->transferSettings() = settings;
245
246 // if we download chunks we do not want to wait for too long on mirrors that have slow activity
247 // note: this sets the activity timeout, not the download timeout
248 req->transferSettings().setTimeout( 2 );
249
250 MIL << "Creating Request to download blocks via mirror: " << myUrl << std::endl;
251 if ( !addBlockRanges( req, std::move(blocks) ) ) {
253 return;
254 }
255
256 // we just use a mirror once per file, remove it from the list
257 _fileMirrors.erase( mirror.first );
258
259 addNewRequest( req );
260
261 // trigger next downloads
263 }
264
266 {
267 // it was impossible to find a new mirror, check if we still have running requests we can wait for, if not
268 // we can only fail at this point
269 if ( !_runningRequests.size() ) {
271 }
272 }
273
275 {
276 bool triggerResched = false;
277 for ( auto &req : _runningRequests ) {
278 if ( req->state() == NetworkRequest::Pending ) {
279 triggerResched = true;
280 req->setPriority( NetworkRequest::Critical, false );
281 }
282 }
283 if ( triggerResched )
284 stateMachine()._requestDispatcher->reschedule();
285 }
286
287 void RangeDownloaderBaseState::addNewRequest(std::shared_ptr<Request> req , const bool connectSignals)
288 {
289 if ( connectSignals )
290 req->connectSignals( *this );
291
292 _runningRequests.push_back( req );
293 stateMachine()._requestDispatcher->enqueue( req );
294
295 if ( req->_myMirror )
296 req->_myMirror->startTransfer();
297 }
298
300 {
301 const off_t expFSize = stateMachine()._spec.expectedFileSize();
302 if ( expFSize > 0 && expFSize < currentFilesize ) {
304 return false;
305 }
306 return true;
307 }
308
312 bool RangeDownloaderBaseState::addBlockRanges ( std::shared_ptr<Request> req , std::vector<Block> &&blocks ) const
313 {
314 req->resetRequestRanges();
315 for ( const auto &block : blocks ) {
316 if ( block.chksumVec && block.chksumtype.size() ) {
317 std::shared_ptr<zypp::Digest> dig = std::make_shared<zypp::Digest>();
318 if ( !dig->create( block.chksumtype ) ) {
319 WAR_MEDIA << "Trying to create Digest with chksum type " << block.chksumtype << " failed " << std::endl;
320 return false;
321 }
322
324 DBG_MEDIA << "Starting block " << block.start << " with checksum " << zypp::Digest::digestVectorToString( *block.chksumVec ) << "." << std::endl;
325 req->addRequestRange( block.start, block.len, dig, *block.chksumVec, std::any( block ), block.chksumCompareLen, block.chksumPad );
326 } else {
327
329 DBG_MEDIA << "Starting block " << block.start << " without checksum." << std::endl;
330 req->addRequestRange( block.start, block.len, {}, {}, std::any( block ) );
331 }
332 }
333 return true;
334 }
335
337 {
338 _error = std::move( err );
339 cancelAll( _error );
340 zypp::filesystem::unlink( stateMachine()._spec.targetPath() );
341 _sigFailed.emit();
342 }
343
344 void RangeDownloaderBaseState::setFailed(std::string &&reason)
345 {
347 }
348
350 {
352 _sigFinished.emit();
353 }
354
356 {
357 while( _runningRequests.size() ) {
358 auto req = _runningRequests.back();
359 req->disconnectSignals();
360 _runningRequests.pop_back();
361 stateMachine()._requestDispatcher->cancel( *req, err );
362 if ( req->_myMirror )
363 req->_myMirror->cancelTransfer();
364 }
365 }
366
367 std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextBlocks( const std::string &urlScheme )
368 {
369 std::vector<Block> blocks;
370 const auto prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
371 size_t accumulatedSize = 0;
372
373 bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
374
375 std::optional<size_t> lastBlockEnd;
376 while ( _ranges.size() && accumulatedSize < prefSize ) {
377 const auto &r = _ranges.front();
378
379 if ( !canDoRandomBlocks && lastBlockEnd ) {
380 if ( static_cast<const size_t>(r.start) != (*lastBlockEnd)+1 )
381 break;
382 }
383
384 lastBlockEnd = r.start + r.len - 1;
385 accumulatedSize += r.len;
386
387 blocks.push_back( std::move( _ranges.front() ) );
388 _ranges.pop_front();
389
390 }
391 DBG_MEDIA << "Accumulated " << blocks.size() << " blocks with accumulated size of: " << accumulatedSize << "." << std::endl;
392 return blocks;
393 }
394
395 std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextFailedBlocks( const std::string &urlScheme )
396 {
397 const auto prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
398 // sort the failed requests by block number, this should make sure get them in offset order as well
399 _failedRanges.sort( []( const auto &a , const auto &b ){ return a.start < b.start; } );
400
401 bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
402
403 std::vector<Block> fblks;
404 std::optional<size_t> lastBlockEnd;
405 size_t accumulatedSize = 0;
406 while ( _failedRanges.size() ) {
407
408 const auto &block =_failedRanges.front();
409
410 //we need to check if we have consecutive blocks because only http mirrors support random request ranges
411 if ( !canDoRandomBlocks && lastBlockEnd ) {
412 if ( static_cast<const size_t>(block.start) != (*lastBlockEnd)+1 )
413 break;
414 }
415
416 lastBlockEnd = block.start + block.len - 1;
417 accumulatedSize += block.len;
418
419 fblks.push_back( std::move( _failedRanges.front() ));
420 _failedRanges.pop_front();
421
422 fblks.back()._retryCount += 1;
423
424 if ( accumulatedSize >= prefSize )
425 break;
426 }
427
428 return fblks;
429 }
430
432 {
433 // this case should never happen because we never start a multi download if we do not know the filesize beforehand
434 if ( filesize == 0 ) return 4 * 1024 * 1024;
435 else if ( filesize < 4*1024*1024 ) return filesize;
436 else if ( filesize < 8*1024*1024 ) return 4*1024*1024;
437 else if ( filesize < 16*1024*1024 ) return 8*1024*1024;
438 else if ( filesize < 256*1024*1024 ) return 10*1024*1024;
439 return 4*1024*1024;
440 }
441}
Store and operate with byte count.
Definition: ByteCount.h:31
static const Unit K
1024 Byte
Definition: ByteCount.h:45
static std::string digestVectorToString(const UByteArray &vec)
get hex string representation of the digest vector given as parameter
Definition: Digest.cc:211
Base class for Exception.
Definition: Exception.h:146
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:75
Holds transfer setting.
void setTimeout(long t)
set the transfer timeout
std::pair< std::vector< Url >::const_iterator, MirrorHandle > MirrorPick
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
The NetworkRequestError class Represents a error that occured in.
Type type() const
type Returns the type of the error
bool isError() const
isError Will return true if this is a actual error
zypp::ByteCount downloadedByteCount() const
Returns the number of already downloaded bytes as reported by the backend.
Definition: request.cc:1313
void * nativeHandle() const
Definition: request.cc:1207
unsigned short a
unsigned short b
#define WAR_MEDIA
Definition: mediadebug_p.h:30
#define DBG_MEDIA
Definition: mediadebug_p.h:28
Url clearQueryString(const Url &url)
Definition: curlhelper.cc:367
const long & ZYPP_MEDIA_CURL_DEBUG()
const long& for setting CURLOPT_DEBUGDATA Returns a reference to a static variable,...
Definition: curlhelper.cc:36
int unlink(const Pathname &path)
Like 'unlink'.
Definition: PathInfo.cc:700
bool hasPrefixCI(const C_Str &str_r, const C_Str &prefix_r)
Definition: String.h:1030
NetworkRequestError setupMirror(const MirrorControl::MirrorPick &pick, Url &url, TransferSettings &set)
std::vector< Block > getNextBlocks(const std::string &urlScheme)
void onRequestProgress(NetworkRequest &, off_t, off_t, off_t, off_t)
bool addBlockRanges(std::shared_ptr< Request > req, std::vector< Block > &&blocks) const
Just initialize the requests ranges from the internal blocklist.
void onRequestFinished(NetworkRequest &req, const NetworkRequestError &err)
void onRequestStarted(NetworkRequest &)
void handleRequestError(std::shared_ptr< Request > req, const zyppng::NetworkRequestError &err)
std::vector< std::shared_ptr< Request > > _runningRequests
bool assertExpectedFilesize(off_t currentFilesize)
std::vector< Block > getNextFailedBlocks(const std::string &urlScheme)
static zypp::ByteCount makeBlksize(size_t filesize)
void mirrorReceived(MirrorControl::MirrorPick mirror) override
void cancelAll(const NetworkRequestError &err)
void addNewRequest(std::shared_ptr< Request > req, const bool connectSignals=true)
void setFailed(NetworkRequestError &&err)
#define MIL
Definition: Logger.h:96
#define WAR
Definition: Logger.h:97