libzypp 17.31.23
provide.cc
Go to the documentation of this file.
1#include "private/provide_p.h"
5#include <zypp-core/zyppng/io/IODevice>
6#include <zypp-core/Url.h>
7#include <zypp-core/base/DtorReset>
8#include <zypp-core/fs/PathInfo.h>
9#include <zypp-media/MediaException>
10#include <zypp-media/FileCheckException>
11#include <zypp-media/CDTools>
12
13// required to generate uuids
14#include <glib.h>
15
16
17L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_PROVIDER_DEBUG)
18
19namespace zyppng {
20
22 : BasePrivate(pub)
23 , _workDir( std::move(workDir) )
24 , _workerPath( constants::DEFAULT_PROVIDE_WORKER_PATH.data() )
25 {
26 if ( _workDir.empty() ) {
28 } else {
30 }
31
32 MIL << "Provider workdir is: " << _workDir << std::endl;
33
34 _scheduleTrigger->setSingleShot(true);
35 Base::connect( *_scheduleTrigger, &Timer::sigExpired, *this, &ProvidePrivate::doSchedule );
36 }
37
39 {
40 if ( provideDebugEnabled () ) {
41 std::string_view reasonStr;
42 switch( reason ) {
43 case ProvideStart:
44 reasonStr = "ProvideStart";
45 break;
46 case QueueIdle:
47 reasonStr = "QueueIdle";
48 break;
49 case EnqueueItem:
50 reasonStr = "EnqueueItem";
51 break;
52 case EnqueueReq:
53 reasonStr = "EnqueueReq";
54 break;
55 case FinishReq:
56 reasonStr = "FinishReq";
57 break;
58 case RestartAttach:
59 reasonStr = "RestartAttach";
60 break;
61 }
62 DBG << "Triggering the schedule timer (" << reasonStr << ")" << std::endl;
63 }
64
65 // we use a single shot timer that instantly times out when the event loop is entered the next time
66 // this way we compress many schedule requests that happen during a eventloop run into one
67 _scheduleTrigger->start(0);
68 }
69
70 void ProvidePrivate::doSchedule ( zyppng::Timer & )
71 {
72 if ( !_isRunning )
73 return;
74
75 if ( _isScheduling ) {
76 DBG_PRV << "Scheduling triggered during scheduling, returning immediately." << std::endl;
77 return;
78 }
79
80 const int cpuLimit =
81#ifdef _SC_NPROCESSORS_ONLN
82 sysconf(_SC_NPROCESSORS_ONLN) * 2;
83#else
84 DEFAULT_CPU_WORKERS;
85#endif
86
87 // helper lambda to find the worker that is idle for the longest time
88 constexpr auto findLaziestWorker = []( const auto &workerQueues, const auto &idleNames ) {
89 auto candidate = workerQueues.end();
90 ProvideQueue::TimePoint candidateIdleSince = ProvideQueue::TimePoint::max();
91
92 //find the worker thats idle the longest
93 for ( const auto &name : idleNames ) {
94 auto thisElem = workerQueues.find(name);
95 if ( thisElem == workerQueues.end() )
96 continue;
97
98 const auto idleS = thisElem->second->idleSince();
99 if ( idleS
100 && ( candidate == workerQueues.end() || *idleS < candidateIdleSince ) ) {
101 candidateIdleSince = *idleS;
102 candidate = thisElem;
103 }
104 }
105
106 if ( candidate != workerQueues.end() )
107 MIL_PRV << "Found idle worker:" << candidate->first << " idle since: " << candidateIdleSince.time_since_epoch().count() << std::endl;
108
109 return candidate;
110 };
111
112 // clean up old media
113
114 for ( auto iMedia = _attachedMediaInfos.begin(); iMedia != _attachedMediaInfos.end(); ) {
115 if ( iMedia->_refCount > 0 ) {
116 MIL_PRV << "Not releasing media " << iMedia->_name << " refcount is not zero" << std::endl;
117 ++iMedia;
118 continue;
119 }
120 if ( iMedia->_workerType == ProvideQueue::Config::Downloading ) {
121 // we keep the information around for an hour so we do not constantly download the media files for no reasonDD
122 if ( std::chrono::steady_clock::now() - iMedia->_idleSince >= std::chrono::hours(1) ) {
123 MIL << "Detaching medium " << iMedia->_name << " for baseUrl " << iMedia->_attachedUrl << std::endl;
124 iMedia = _attachedMediaInfos.erase(iMedia);
125 continue;
126 } else {
127 MIL_PRV << "Not releasing media " << iMedia->_name << " downloading worker and not timed out yet." << std::endl;
128 }
129 } else {
130 // mounting handlers, we need to send a request to the workers
131 auto bQueue = iMedia->_backingQueue.lock();
132 if ( bQueue ) {
133 zypp::Url url = iMedia->_attachedUrl;
134 url.setScheme( url.getScheme() + std::string( constants::ATTACHED_MEDIA_SUFFIX) );
135 url.setAuthority( iMedia->_name );
136 const auto &req = ProvideRequest::createDetach( url );
137 if ( req ) {
138 MIL << "Detaching medium " << iMedia->_name << " for baseUrl " << iMedia->_attachedUrl << std::endl;
139 bQueue->enqueue ( *req );
140 iMedia = _attachedMediaInfos.erase(iMedia);
141 continue;
142 } else {
143 ERR << "Could not send detach request, creating the request failed" << std::endl;
144 }
145 } else {
146 ERR << "Could not send detach request since no backing queue was defined" << std::endl;
147 }
148 }
149 ++iMedia;
150 }
151
152 zypp::DtorReset schedFlag( _isScheduling, false );
153 _isScheduling = true;
154
155 const auto schedStart = std::chrono::steady_clock::now();
156 MIL_PRV << "Start scheduling" << std::endl;
157
158 zypp::OnScopeExit deferExitMessage( [&](){
159 const auto dur = std::chrono::steady_clock::now() - schedStart;
160 MIL_PRV << "Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>( dur ).count () << std::endl;
161 });
162
163 // bump inactive items
164 for ( auto it = _items.begin (); it != _items.end(); ) {
165 // was maybe released during scheduling
166 if ( !(*it) )
167 it = _items.erase(it);
168 else {
169 auto &item = *it;
170 if ( item->state() == ProvideItem::Uninitialized ) {
171 item->initialize();
172 }
173 it++;
174 }
175 }
176
177 // we are scheduling now, everything that triggered the timer until now we can forget about
178 _scheduleTrigger->stop();
179
180 for( auto queueIter = _queues.begin(); queueIter != _queues.end(); queueIter ++ ) {
181
182 const auto &scheme = queueIter->_schemeName;
183 auto &queue = queueIter->_requests;
184
185 if ( !queue.size() )
186 continue;
187
188 const auto &configOpt = schemeConfig ( scheme );
189
190 MIL_PRV << "Start scheduling for scheme:" << scheme << " queue size is: " << queue.size() << std::endl;
191
192 if ( !configOpt ) {
193 // FAIL all requests in this queue
194 ERR << "Scheme: " << scheme << " failed to return a valid configuration." << std::endl;
195
196 while( queue.size() ) {
197 auto item = std::move( queue.front() );
198 queue.pop_front();
199 if ( item->owner() )
200 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to query scheme config.")) );
201 }
202
203 continue;
204 }
205
206 // the scheme config that defines how we schedule requests on this set of queues
207 const auto &config = configOpt.get();
208 const auto isSingleInstance = ( (config.cfg_flags() & ProvideQueue::Config::SingleInstance) == ProvideQueue::Config::SingleInstance );
209 if ( config.worker_type() == ProvideQueue::Config::Downloading && !isSingleInstance ) {
210
211 for( auto i = queue.begin (); i != queue.end(); ) {
212
213 // this is the only place where we remove elements from the queue when the scheduling flag is active
214 // other code just nulls out requests in the queue if during scheduling items need to be removed
215 while ( i != queue.end() && !(*i) ) {
216 i = queue.erase(i);
217 }
218
219 if ( i == queue.end() )
220 break;
221
222 ProvideRequestRef item = *i;
223
224 // Downloading queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
225 // If we hit this code path, its a bug
226 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
227 i = queue.erase(i);
228 if ( item->owner() )
229 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Downloading Queues do not support ProvideMessage::Code::Attach requests") ) );
230 continue;
231 }
232
233 MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
234
235 // how many workers for this type do already exist
236 int existingTypeWorkers = 0;
237
238 // how many currently active connections are there
239 int existingConnections = 0;
240
241 // all currently available possible queues for the request
242 std::vector< std::pair<zypp::Url, ProvideQueue*> > possibleHostWorkers;
243
244 // currently idle workers
245 std::vector<std::string> idleWorkers;
246
247 // all mirrors without a existing worker
248 std::vector<zypp::Url> mirrsWithoutWorker;
249 for ( const auto &url : item->urls() ) {
250
251 if ( effectiveScheme( url.getScheme() ) != scheme ) {
252 MIL << "Mirror URL " << url << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
253 continue;
254 }
255
256 if( item->owner()->canRedirectTo( item, url ) )
257 mirrsWithoutWorker.push_back( url );
258 else {
259 MIL_PRV << "URL was rejected" << url << std::endl;
260 }
261 }
262
263 // at this point the list contains all useable mirrors, if this list is empty the request needs to fail
264 if( mirrsWithoutWorker.size() == 0 ) {
265 MIL << "Request has NO usable URLs" << std::endl;
266 if ( item->owner() )
267 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
268 i = queue.erase(i);
269 continue;
270 }
271
272
273 for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
274 if ( ProvideQueue::Config::Downloading != workerQueue->workerConfig().worker_type() )
275 continue;
276
277 existingTypeWorkers ++;
278 existingConnections += workerQueue->activeRequests();
279
280 if ( workerQueue->isIdle() )
281 idleWorkers.push_back (queueName);
282
283 if ( !zypp::str::startsWith( queueName, scheme ) )
284 continue;
285
286 for ( auto i = mirrsWithoutWorker.begin (); i != mirrsWithoutWorker.end(); ) {
287 const auto &u = *i;
288 if ( u.getHost() == workerQueue->hostname() ) {
289 if ( workerQueue->requestCount() < constants::DEFAULT_ACTIVE_CONN_PER_HOST )
290 possibleHostWorkers.push_back( {u, workerQueue.get()} );
291 i = mirrsWithoutWorker.erase( i );
292 // we can not stop after removing the first hit, since there could be multiple mirrors with the same hostname
293 } else {
294 ++i;
295 }
296 }
297 }
298
299 if( provideDebugEnabled() ) {
300 MIL << "Current stats: " << std::endl;
301 MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
302 MIL << "Existing active connections: " << existingConnections << std::endl;
303 MIL << "Possible host workers: "<< possibleHostWorkers.size() << std::endl;
304 MIL << "Mirrors without worker: " << mirrsWithoutWorker.size() << std::endl;
305 }
306
307 // need to wait for requests to finish in order to schedule more requests
308 if ( existingConnections >= constants::DEFAULT_ACTIVE_CONN ) {
309 MIL_PRV << "Reached maximum nr of connections, break" << std::endl;
310 break;
311 }
312
313 // if no workers are running, take the first mirror and start a worker for it
314 // if < nr of workers are running, use a mirror we do not have a conn yet to
315 if ( existingTypeWorkers < constants::DEFAULT_MAX_DYNAMIC_WORKERS
316 && mirrsWithoutWorker.size() ) {
317
318 MIL_PRV << "Free worker slots and available mirror URLs, starting a new worker" << std::endl;
319
320 //@TODO out of the available mirrors use the best one based on statistics ( if available )
321 bool found = false;
322 for( const auto &url : mirrsWithoutWorker ) {
323
324 // mark this URL as used now, in case the queue can not be started we won't try it anymore
325 if ( !item->owner()->safeRedirectTo ( item, url ) )
326 continue;
327
328 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
329 if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
330 break;
331 } else {
332
333 MIL_PRV << "Started worker for " << url.getHost() << " enqueing request" << std::endl;
334
335 item->setActiveUrl(url);
336 found = true;
337
338 std::string str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
339 _workerQueues[str] = q;
340 q->enqueue( item );
341 break;
342 }
343 }
344
345 if( found ) {
346 i = queue.erase(i);
347 continue;
348 }
349 }
350
351 // if we cannot start a new worker, find the best queue where we can push the item into
352 if ( possibleHostWorkers.size() ) {
353
354 MIL_PRV << "No free worker slots, looking for the best existing worker" << std::endl;
355 bool found = false;
356 while( possibleHostWorkers.size () ) {
357 std::vector< std::pair<zypp::Url, ProvideQueue *> >::iterator candidate = possibleHostWorkers.begin();
358 for ( auto i = candidate+1; i != possibleHostWorkers.end(); i++ ) {
359 if ( i->second->activeRequests () < candidate->second->activeRequests () )
360 candidate = i;
361 }
362
363 if ( !item->owner()->safeRedirectTo( item, candidate->first ) ) {
364 possibleHostWorkers.erase( candidate );
365 continue;
366 }
367
368 MIL_PRV << "Using existing worker " << candidate->first.getHost() << " to download request" << std::endl;
369
370 found = true;
371 item->setActiveUrl( candidate->first );
372 candidate->second->enqueue( item );
373 break;
374 }
375
376 if( found ) {
377 i = queue.erase(i);
378 continue;
379 }
380 }
381
382 // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
383 // a new worker
384 if ( idleWorkers.size() && mirrsWithoutWorker.size() ) {
385
386 MIL_PRV << "No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
387
388 auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
389 if ( candidate != _workerQueues.end() ) {
390
391 // for now we decomission the worker and start a new one, should we instead introduce a "reset" message
392 // that repurposes the worker to another hostname/workdir config?
393 _workerQueues.erase(candidate);
394
395 //@TODO out of the available mirrors use the best one based on statistics ( if available )
396 bool found = false;
397 for( const auto &url : mirrsWithoutWorker ) {
398
399 if ( !item->owner()->safeRedirectTo ( item, url ) )
400 continue;
401
402 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
403 if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
404 break;
405 } else {
406
407 MIL_PRV << "Replaced worker for " << url.getHost() << ", enqueing request" << std::endl;
408
409 item->setActiveUrl(url);
410 found = true;
411
412 auto str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
413 _workerQueues[str] = q;
414 q->enqueue( item );
415 }
416 }
417
418 if( found ) {
419 i = queue.erase(i);
420 continue;
421 }
422 }
423 }
424
425 // if we reach here we skip over the item and try to schedule it again later
426 MIL_PRV << "End of line, deferring request for next try." << std::endl;
427 i++;
428
429 }
430 } else if ( config.worker_type() == ProvideQueue::Config::CPUBound && !isSingleInstance ) {
431
432 for( auto i = queue.begin (); i != queue.end(); ) {
433
434 // this is the only place where we remove elements from the queue when the scheduling flag is active
435 // other code just nulls out requests in the queue if during scheduling items need to be removed
436 while ( i != queue.end() && !(*i) ) {
437 i = queue.erase(i);
438 }
439
440 if ( i == queue.end() )
441 break;
442
443 // make a real reference so it does not dissapear when we remove it from the queue
444 ProvideRequestRef item = *i;
445
446 // CPU bound queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
447 // If we hit this code path, its a bug
448 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
449 i = queue.erase(i);
450 if ( item->owner () )
451 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("CPU bound Queues do not support ProvideAttachSpecRef requests") ) );
452 continue;
453 }
454
455 MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
456
457 // how many workers for this type do already exist
458 int existingTypeWorkers = 0;
459 int existingSchemeWorkers = 0;
460
461 // all currently available possible queues for the request
462 std::vector< ProvideQueue* > possibleWorkers;
463
464 // currently idle workers
465 std::vector<std::string> idleWorkers;
466
467 // the URL we are going to use this time
468 zypp::Url url;
469
470 //CPU bound queues do not spawn per mirrors, we use the first compatible URL
471 for ( const auto &tmpurl : item->urls() ) {
472 if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
473 MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
474 continue;
475 }
476 url = tmpurl;
477 break;
478 }
479
480 // at this point if the URL is empty the request needs to fail
481 if( !url.isValid() ) {
482 MIL << "Request has NO usable URLs" << std::endl;
483 if ( item->owner() )
484 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
485 i = queue.erase(i);
486 continue;
487 }
488
489 for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
490
491 if ( ProvideQueue::Config::CPUBound != workerQueue->workerConfig().worker_type() )
492 continue;
493
494 const bool thisScheme = zypp::str::startsWith( queueName, scheme );
495
496 existingTypeWorkers ++;
497 if ( thisScheme ) {
498 existingSchemeWorkers++;
499 if ( workerQueue->canScheduleMore() )
500 possibleWorkers.push_back(workerQueue.get());
501 }
502
503 if ( workerQueue->isIdle() )
504 idleWorkers.push_back(queueName);
505 }
506
507 if( provideDebugEnabled() ) {
508 MIL << "Current stats: " << std::endl;
509 MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
510 MIL << "Possible CPU workers: "<< possibleWorkers.size() << std::endl;
511 }
512
513 // first we use existing idle workers of the current type
514 if ( possibleWorkers.size() ) {
515 bool found = false;
516 for ( auto &w : possibleWorkers ) {
517 if ( w->isIdle() ) {
518 MIL_PRV << "Using existing idle worker to provide request" << std::endl;
519 // this is not really required because we are not doing redirect checks
520 item->owner()->redirectTo ( item, url );
521 item->setActiveUrl( url );
522 w->enqueue( item );
523 i = queue.erase(i);
524 found = true;
525 break;
526 }
527 }
528 if ( found )
529 continue;
530 }
531
532 // we first start as many workers as we need before queueing more request to existing ones
533 if ( existingTypeWorkers < cpuLimit ) {
534
535 MIL_PRV << "Free CPU slots, starting a new worker" << std::endl;
536
537 // this is not really required because we are not doing redirect checks
538 item->owner()->redirectTo ( item, url );
539
540 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
541 if ( q->startup( scheme, _workDir / scheme ) ) {
542
543 item->setActiveUrl(url);
544
545 auto str = zypp::str::Format("%1%#%2%") % scheme % existingSchemeWorkers;
546 _workerQueues[str] = q;
547 q->enqueue( item );
548 i = queue.erase(i);
549 continue;
550 } else {
551 // CPU bound requests can not recover from this error
552 i = queue.erase(i);
553 if ( item->owner() )
554 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
555 continue;
556 }
557 }
558
559 // we can not start more workers, all we can do now is fill up queues of existing ones
560 if ( possibleWorkers.size() ) {
561 MIL_PRV << "No free CPU slots, looking for the best existing worker" << std::endl;
562
563 if( possibleWorkers.size () ) {
564 std::vector<ProvideQueue *>::iterator candidate = possibleWorkers.begin();
565 for ( auto i = candidate+1; i != possibleWorkers.end(); i++ ) {
566 if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
567 candidate = i;
568 }
569
570 // this is not really required because we are not doing redirect checks
571 item->owner()->redirectTo ( item, url );
572
573 MIL_PRV << "Using existing worker to provide request" << std::endl;
574 item->setActiveUrl( url );
575 (*candidate)->enqueue( item );
576 i = queue.erase(i);
577 continue;
578 }
579 }
580
581 // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
582 // a new worker
583 if ( idleWorkers.size() ) {
584
585 MIL_PRV << "No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
586
587 auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
588 if ( candidate != _workerQueues.end() ) {
589
590 _workerQueues.erase(candidate);
591
592 // this is not really required because we are not doing redirect checks
593 item->owner()->redirectTo ( item, url );
594
595 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
596 if ( q->startup( scheme, _workDir / scheme ) ) {
597
598 MIL_PRV << "Replaced worker, enqueing request" << std::endl;
599
600 item->setActiveUrl(url);
601
602 auto str = zypp::str::Format("%1%#%2%") % scheme % ( existingSchemeWorkers + 1 );
603 _workerQueues[str] = q;
604 q->enqueue( item );
605 i = queue.erase(i);
606 continue;
607 } else {
608 // CPU bound requests can not recover from this error
609 i = queue.erase(i);
610 if ( item->owner() )
611 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
612 continue;
613 }
614 }
615 } else {
616 MIL_PRV << "No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
617 break;
618 }
619
620 // if we reach here we skip over the item and try to schedule it again later
621 MIL_PRV << "End of line, deferring request for next try." << std::endl;
622 i++;
623 }
624
625 } else {
626 // either SingleInstance worker or Mounting/VolatileMounting
627
628 for( auto i = queue.begin (); i != queue.end(); ) {
629
630 // this is the only place where we remove elements from the queue when the scheduling flag is active
631 // other code just nulls out requests in the queue if during scheduling items need to be removed
632 while ( i != queue.end() && !(*i) ) {
633 i = queue.erase(i);
634 }
635
636 if ( i == queue.end() )
637 break;
638
639 // make a real reference so it does not dissapear when we remove it from the queue
640 ProvideRequestRef item = *i;
641 MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
642
643 zypp::Url url;
644
645 //mounting queues do not spawn per mirrors, we use the first compatible URL
646 for ( const auto &tmpurl : item->urls() ) {
647 if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
648 MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
649 continue;
650 }
651 url = tmpurl;
652 break;
653 }
654
655 // at this point if the URL is empty the request needs to fail
656 if( !url.isValid() ) {
657 MIL << "Request has NO usable URLs" << std::endl;
658 if ( item->owner() )
659 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
660 i = queue.erase(i);
661 continue;
662 }
663
664
665 ProvideQueue *qToUse = nullptr;
666 if ( !_workerQueues.count(scheme) ) {
667 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
668 if ( !q->startup( scheme, _workDir / scheme ) ) {
669 ERR << "Worker startup failed!" << std::endl;
670 // mounting/single instance requests can not recover from this error
671 i = queue.erase(i);
672
673 if ( item->owner() )
674 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
675 continue;
676 }
677
678 MIL_PRV << "Started worker, enqueing request" << std::endl;
679 qToUse = q.get();
680 _workerQueues[scheme] = q;
681 } else {
682 MIL_PRV << "Found worker, enqueing request" << std::endl;
683 qToUse = _workerQueues.at(scheme).get();
684 }
685
686 // this is not really required because we are not doing redirect checks
687 item->owner()->redirectTo ( item, url );
688
689 item->setActiveUrl(url);
690 qToUse->enqueue( item );
691 i = queue.erase(i);
692 }
693 }
694 }
695 }
696
697 std::list<ProvideItemRef> &ProvidePrivate::items()
698 {
699 return _items;
700 }
701
702 zypp::media::CredManagerOptions &ProvidePrivate::credManagerOptions ()
703 {
704 return _credManagerOptions;
705 }
706
707 std::vector<AttachedMediaInfo> &ProvidePrivate::attachedMediaInfos()
708 {
709 return _attachedMediaInfos;
710 }
711
712 expected<ProvideQueue::Config> ProvidePrivate::schemeConfig( const std::string &scheme )
713 {
714 if ( auto i = _schemeConfigs.find( scheme ); i != _schemeConfigs.end() ) {
715 return expected<ProvideQueue::Config>::success(i->second);
716 } else {
717 // we do not have the queue config yet, we need to start a worker to get one
718 ProvideQueue q( *this );
719 if ( !q.startup( scheme, _workDir / scheme ) ) {
720 return expected<ProvideQueue::Config>::error(ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to start worker to read scheme config.")));
721 }
722 auto newItem = _schemeConfigs.insert( std::make_pair( scheme, q.workerConfig() ));
723 return expected<ProvideQueue::Config>::success(newItem.first->second);
724 }
725 }
726
727 std::optional<zypp::ManagedFile> ProvidePrivate::addToFileCache( const zypp::filesystem::Pathname &downloadedFile )
728 {
729 const auto &key = downloadedFile.asString();
730
731 if ( !zypp::PathInfo(downloadedFile).isExist() ) {
732 _fileCache.erase ( key );
733 return {};
734 }
735
736 auto i = _fileCache.insert( { key, FileCacheItem() } );
737 if ( !i.second ) {
738 // file did already exist in the cache, return the shared data
739 i.first->second._deathTimer.reset();
740 return i.first->second._file;
741 }
742
743 i.first->second._file = zypp::ManagedFile( downloadedFile, zypp::filesystem::unlink );
744 return i.first->second._file;
745 }
746
747 bool ProvidePrivate::isInCache ( const zypp::Pathname &downloadedFile ) const
748 {
749 const auto &key = downloadedFile.asString();
750 return (_fileCache.count(key) > 0);
751 }
752
753 void ProvidePrivate::queueItem ( ProvideItemRef item )
754 {
755 _items.push_back( item );
756 schedule( ProvidePrivate::EnqueueItem );
757 }
758
759 void ProvidePrivate::dequeueItem( ProvideItem *item)
760 {
761 auto elem = std::find_if( _items.begin(), _items.end(), [item]( const auto &i){ return i.get() == item; } );
762 if ( elem != _items.end() ) {
763 if ( _isScheduling ) {
764 (*elem).reset();
765 } else {
766 _items.erase(elem);
767 }
768 }
769 }
770
771 std::string ProvidePrivate::nextMediaId() const
772 {
773 zypp::AutoDispose rawStr( g_uuid_string_random (), g_free );
774 return zypp::str::asString ( rawStr.value() );
775 }
776
777 AttachedMediaInfo &ProvidePrivate::addMedium(ProvideQueue::Config::WorkerType workerType, const zypp::Url &baseUrl, ProvideMediaSpec &spec )
778 {
779 auto str = nextMediaId();
780 MIL_PRV << "Generated new ID for media attachment: " << str << std::endl;
781 _attachedMediaInfos.push_back( AttachedMediaInfo{ std::move(str), {}, workerType, baseUrl, spec } );
782 return _attachedMediaInfos.back();
783 }
784
785 AttachedMediaInfo &ProvidePrivate::addMedium(zypp::proto::Capabilities::WorkerType workerType, ProvideQueueWeakRef backingQueue, const std::string &id, const zypp::Url &baseUrl, ProvideMediaSpec &spec)
786 {
787 MIL_PRV << "New media attachment with id: " << id << std::endl;
788 _attachedMediaInfos.push_back( AttachedMediaInfo{ id, backingQueue, workerType, baseUrl, spec } );
789 return _attachedMediaInfos.back();
790 }
791
792 bool ProvidePrivate::queueRequest ( ProvideRequestRef req )
793 {
794 const auto &schemeName = effectiveScheme( req->url().getScheme() );
795 auto existingQ = std::find_if( _queues.begin (), _queues.end(), [&schemeName]( const auto &qItem) {
796 return (qItem._schemeName == schemeName);
797 });
798 if ( existingQ != _queues.end() ) {
799 existingQ->_requests.push_back(req);
800 } else {
801 _queues.push_back( ProvidePrivate::QueueItem{ schemeName, {req} } );
802 }
803
804 schedule( ProvidePrivate::EnqueueReq );
805 return true;
806 }
807
808 bool ProvidePrivate::dequeueRequest(ProvideRequestRef req , std::exception_ptr error)
809 {
810 auto queue = req->currentQueue ();
811 if ( queue ) {
812 queue->cancel( req.get(), error );
813 return true;
814 } else {
815 // Request not started yet, search request queues
816 for ( auto &q : _queues ) {
817 auto elem = std::find( q._requests.begin(), q._requests.end(), req );
818 if ( elem != q._requests.end() ) {
819 q._requests.erase(elem);
820
821 if ( req->owner() )
822 req->owner()->finishReq( nullptr, req, error );
823 return true;
824 }
825 }
826 }
827 return false;
828 }
829
830 const zypp::Pathname &ProvidePrivate::workerPath() const
831 {
832 return _workerPath;
833 }
834
835 const std::string ProvidePrivate::queueName( ProvideQueue &q ) const
836 {
837 for ( const auto &v : _workerQueues ) {
838 if ( v.second.get() == &q )
839 return v.first;
840 }
841 return {};
842 }
843
844 bool ProvidePrivate::isRunning() const
845 {
846 return _isRunning;
847 }
848
849 std::string ProvidePrivate::effectiveScheme(const std::string &scheme) const
850 {
851 const std::string &ss = zypp::str::stripSuffix( scheme, constants::ATTACHED_MEDIA_SUFFIX );
852 if ( auto it = _workerAlias.find ( ss ); it != _workerAlias.end () ) {
853 return it->second;
854 }
855 return ss;
856 }
857
858 void ProvidePrivate::onPulseTimeout( Timer & )
859 {
860 DBG_PRV << "Pulse timeout" << std::endl;
861
862 auto now = std::chrono::steady_clock::now();
863
864 if ( _log ) _log->pulse();
865
866 // release old cache files
867 for ( auto i = _fileCache.begin (); i != _fileCache.end(); ) {
868 auto &cacheItem = i->second;
869 if ( cacheItem._file.unique() ) {
870 if ( cacheItem._deathTimer ) {
871 if ( now - *cacheItem._deathTimer < std::chrono::seconds(20) ) {
872 MIL << "Releasing file " << *i->second._file << " from cache, death timeout." << std::endl;
873 i = _fileCache.erase(i);
874 continue;
875 }
876 } else {
877 // start the death timeout
878 cacheItem._deathTimer = std::chrono::steady_clock::now();
879 }
880 }
881
882 ++i;
883 }
884 }
885
886 void ProvidePrivate::onQueueIdle()
887 {
888 if ( !_items.empty() )
889 return;
890 for ( auto &[k,q] : _workerQueues ) {
891 if ( !q->empty() )
892 return;
893 }
894
895 // all queues are empty
896 _sigIdle.emit();
897 }
898
899 void ProvidePrivate::onItemStateChanged( ProvideItem &item )
900 {
901 if ( item.state() == ProvideItem::Finished ) {
902 auto itemRef = item.shared_this<ProvideItem>();
903 auto i = std::find( _items.begin(), _items.end(), itemRef );
904 if ( i == _items.end() ) {
905 ERR << "State of unknown Item changed, ignoring" << std::endl;
906 return;
907 }
908 if ( _isScheduling )
909 i->reset();
910 else
911 _items.erase(i);
912 }
913 if ( _items.empty() )
914 onQueueIdle();
915 }
916
917 uint32_t ProvidePrivate::nextRequestId()
918 {
919 //@TODO is it required to handle overflow?
920 return ++_nextRequestId;
921 }
922
924 {
925 Data( Provide &parent, const std::string &hdl )
926 : _parent( parent.weak_this<Provide>() )
927 , _hdlName(hdl) { }
928
930 auto p = _parent.lock(); if (p) p->releaseMedia(_hdlName);
931 }
932
933 ProvideWeakRef _parent;
934 std::string _hdlName;
935 };
936
937 ProvideMediaHandle::ProvideMediaHandle( Provide &parent, const std::string &hdl )
938 : _ref( std::make_shared<ProvideMediaHandle::Data>( parent, hdl ) )
939 {}
940
941 std::shared_ptr<Provide> ProvideMediaHandle::parent() const
942 {
943 return _ref->_parent.lock();
944 }
945
947 {
948 return ( _ref.get() != nullptr );
949 }
950
951 std::string ProvideMediaHandle::handle() const
952 {
953 return _ref->_hdlName;
954 }
955
956
957 Provide::Provide( const zypp::Pathname &workDir ) : Base( *new ProvidePrivate( zypp::Pathname(workDir), *this ) )
958 {
959 Z_D();
960 connect( *d->_pulseTimer, &Timer::sigExpired, *d, &ProvidePrivate::onPulseTimeout );
961 }
962
963 ProvideRef Provide::create( const zypp::filesystem::Pathname &workDir )
964 {
965 return ProvideRef( new Provide(workDir) );
966 }
967
968 AsyncOpRef<expected<Provide::MediaHandle>> Provide::attachMedia( const zypp::Url &url, const ProvideMediaSpec &request )
969 {
970 return attachMedia ( std::vector<zypp::Url>{url}, request );
971 }
972
973 AsyncOpRef<expected<Provide::MediaHandle>> Provide::attachMedia( const std::vector<zypp::Url> &urls, const ProvideMediaSpec &request )
974 {
975 Z_D();
976
977 if ( urls.empty() ) {
978 return makeReadyResult( expected<Provide::MediaHandle>::error( ZYPP_EXCPT_PTR( zypp::media::MediaException("No usable mirrors in mirrorlist."))) );
979 }
980
981 // sanitize the mirrors to contain only URLs that have same worker types
982 std::vector<zypp::Url> usableMirrs;
983 std::optional<ProvideQueue::Config> scheme;
984
985 for ( auto mirrIt = urls.begin() ; mirrIt != urls.end(); mirrIt++ ) {
986 const auto &s = d->schemeConfig( d->effectiveScheme( mirrIt->getScheme() ) );
987 if ( !s ) {
988 WAR << "URL: " << *mirrIt << " is not supported, ignoring!" << std::endl;
989 continue;
990 }
991 if ( !scheme ) {
992 scheme = *s;
993 usableMirrs.push_back ( *mirrIt );
994 } else {
995 if ( scheme->worker_type () == s->worker_type () ) {
996 usableMirrs.push_back( *mirrIt );
997 } else {
998 WAR << "URL: " << *mirrIt << " has different worker type than the primary URL: "<< usableMirrs.front() <<", ignoring!" << std::endl;
999 }
1000 }
1001 }
1002
1003 if ( !scheme || usableMirrs.empty() ) {
1004 return makeReadyResult( expected<Provide::MediaHandle>::error( ZYPP_EXCPT_PTR ( zypp::media::MediaException("No valid mirrors available") )) );
1005 }
1006
1007 // first check if there is a already attached medium we can use as well
1008 auto &attachedMedia = d->attachedMediaInfos ();
1009 for ( auto &medium : attachedMedia ) {
1010 if ( medium.isSameMedium ( usableMirrs, request ) ) {
1011 medium.ref();
1012 return makeReadyResult( expected<Provide::MediaHandle>::success( Provide::MediaHandle( *this, medium._name) ) );
1013 }
1014 }
1015
1016 auto op = AttachMediaItem::create( usableMirrs, request, *d_func() );
1017 d->queueItem (op);
1018 return op->promise();
1019 }
1020
1021 void Provide::releaseMedia( const std::string &mediaRef )
1022 {
1023 Z_D();
1024
1025 if ( mediaRef.empty() )
1026 return;
1027
1028 const auto i = std::find_if( d->_attachedMediaInfos.begin(), d->_attachedMediaInfos.end(), [&]( const auto &info ){ return info._name == mediaRef;} );
1029 if ( i == d->_attachedMediaInfos.end() ) {
1030 ERR << "Unknown media attach handle" << std::endl;
1031 return;
1032 }
1033
1034 // only unref'ing here, the scheduler will generate a message to the queues if needed
1035 i->unref();
1036 }
1037
1038 AsyncOpRef< expected<ProvideRes> > Provide::provide( const std::vector<zypp::Url> &urls, const ProvideFileSpec &request )
1039 {
1040 Z_D();
1041 auto op = ProvideFileItem::create( urls, request, *d );
1042 d->queueItem (op);
1043 return op->promise();
1044 }
1045
1046 AsyncOpRef< expected<ProvideRes> > Provide::provide( const zypp::Url &url, const ProvideFileSpec &request )
1047 {
1048 return provide( std::vector<zypp::Url>{ url }, request );
1049 }
1050
1051 AsyncOpRef< expected<ProvideRes> > Provide::provide( const MediaHandle &attachHandle, const zypp::Pathname &fileName, const ProvideFileSpec &request )
1052 {
1053 Z_D();
1054 const auto i = std::find_if( d->_attachedMediaInfos.begin(), d->_attachedMediaInfos.end(), [&]( const auto &info ){ return info._name == attachHandle.handle();} );
1055 if ( i == d->_attachedMediaInfos.end() ) {
1056 return makeReadyResult( expected<ProvideRes>::error( ZYPP_EXCPT_PTR( zypp::media::MediaException("Invalid attach handle")) ) );
1057 }
1058
1059 // for downloading items we need to make the baseUrl part of the request URL
1060 zypp::Url url = i->_attachedUrl;
1061
1062 // real mount devices use a ID to reference a attached medium, for those we do not need to send the baseUrl as well since its already
1063 // part of the mount point, so if we mount host:/path/to/repo to the ID 1234 and look for the file /path/to/repo/file1 the request URL will look like: nfs-media://1234/file1
1064 if ( i->_workerType == ProvideQueue::Config::SimpleMount || i->_workerType == ProvideQueue::Config::VolatileMount ) {
1065 url = zypp::Url();
1066 // work around the zypp::Url requirements for certain Url schemes by attaching a suffix, that way we are always able to have a authority
1067 url.setScheme( i->_attachedUrl.getScheme() + std::string(constants::ATTACHED_MEDIA_SUFFIX) );
1068 url.setAuthority( i->_name );
1069 url.setPathName("/");
1070 }
1071
1072 url.appendPathName( fileName );
1073 auto op = ProvideFileItem::create( {url}, request, *d );
1074
1075 i->ref();
1076 op->setMediaRef( MediaHandle( *this, i->_name ));
1077
1078 d->queueItem (op);
1079 return op->promise();
1080 }
1081
1082 AsyncOpRef<expected<std::string>> Provide::checksumForFile ( const zypp::Pathname &p, const std::string &algorithm )
1083 {
1084 using namespace zyppng::operators;
1085
1086 zypp::Url url("chksum:///");
1087 url.setPathName( p );
1088 auto fut = provide( url, zyppng::ProvideFileSpec().setCustomHeaderValue( "chksumType", algorithm ) )
1089 | mbind( [algorithm]( zyppng::ProvideRes &&chksumRes ) {
1090 if ( chksumRes.headers().contains(algorithm) )
1091 return expected<std::string>::success( chksumRes.headers().value(algorithm).asString() );
1092 return expected<std::string>::error( ZYPP_EXCPT_PTR( zypp::FileCheckException("Invalid/Empty checksum returned from worker") ) );
1093 } );
1094 return fut;
1095 }
1096
1097 AsyncOpRef<expected<zypp::ManagedFile>> Provide::copyFile ( const zypp::Pathname &source, const zypp::Pathname &target )
1098 {
1099 using namespace zyppng::operators;
1100
1101 zypp::Url url("copy:///");
1102 url.setPathName( source );
1103 auto fut = provide( url, ProvideFileSpec().setDestFilenameHint( target ))
1104 | mbind( [&]( ProvideRes &&copyRes) {
1105 return expected<zypp::ManagedFile>::success( copyRes.asManagedFile() );
1106 } );
1107 return fut;
1108 }
1109
1111 {
1112 Z_D();
1113 d->_isRunning = true;
1114 d->_pulseTimer->start( 5000 );
1115 d->schedule( ProvidePrivate::ProvideStart );
1116 if ( d->_log ) d->_log->provideStart();
1117 }
1118
1120 {
1121 d_func()->_workerPath = path;
1122 }
1123
1124 bool Provide::ejectDevice(const std::string &queueRef, const std::string &device)
1125 {
1126 if ( !queueRef.empty() ) {
1127 return zypp::media::CDTools::openTray(device);
1128 }
1129 return false;
1130 }
1131
1132 void Provide::setStatusTracker( ProvideStatusRef tracker )
1133 {
1134 d_func()->_log = tracker;
1135 }
1136
1138 {
1139 return d_func()->_workDir;
1140 }
1141
1143 {
1144 Z_D();
1145 return d->_credManagerOptions;
1146 }
1147
1149 {
1150 d_func()->_credManagerOptions = opt;
1151 }
1152
1153 SignalProxy<void ()> Provide::sigIdle()
1154 {
1155 return d_func()->_sigIdle;
1156 }
1157
1158 SignalProxy<Provide::MediaChangeAction ( const std::string &queueRef, const std::string &, const int32_t, const std::vector<std::string> &, const std::optional<std::string> &)> Provide::sigMediaChangeRequested()
1159 {
1160 return d_func()->_sigMediaChange;
1161 }
1162
1163 SignalProxy< std::optional<zypp::media::AuthData> ( const zypp::Url &reqUrl, const std::string &triedUsername, const std::map<std::string, std::string> &extraValues ) > Provide::sigAuthRequired()
1164 {
1165 return d_func()->_sigAuthRequired;
1166 }
1167
1169
1170 ProvideStatus::ProvideStatus( ProvideRef parent )
1171 : _provider( parent )
1172 { }
1173
1175 {
1176 _stats = Stats();
1177 _stats._startTime = std::chrono::steady_clock::now();
1178 _stats._lastPulseTime = std::chrono::steady_clock::now();
1179 }
1180
1182 {
1183 const auto &sTime = item.startTime();
1184 const auto &fTime = item.finishedTime();
1185 if ( sTime > sTime.min() && fTime >= sTime ) {
1186 auto duration = std::chrono::duration_cast<std::chrono::seconds>( item.finishedTime() - item.startTime() );
1187 if ( duration.count() )
1188 MIL << "Item finished after " << duration.count() << " seconds, with " << zypp::ByteCount( item.currentStats()->_bytesProvided.operator zypp::ByteCount::SizeType() / duration.count() ) << "/s" << std::endl;
1189 MIL << "Item finished after " << (item.finishedTime() - item.startTime()).count() << " ns" << std::endl;
1190 }
1191 pulse( );
1192 }
1193
1195 {
1196 MIL << "Item failed" << std::endl;
1197 }
1198
1200 {
1201 return _stats;
1202 }
1203
1205 {
1206 auto prov = _provider.lock();
1207 if ( !prov )
1208 return;
1209
1210 const auto lastFinishedBytes = _stats._finishedBytes;
1211 const auto lastPartialBytes = _stats._partialBytes;
1212 _stats._expectedBytes = _stats._finishedBytes; // finished bytes are expected too!
1213 zypp::ByteCount tmpPartialBytes (0); // bytes that are finished in staging, but not commited to cache yet
1214
1215 for ( const auto &i : prov->d_func()->items() ) {
1216
1217 if ( !i // maybe released during scheduling
1218 || i->state() == ProvideItem::Cancelling )
1219 continue;
1220
1221 if ( i->state() == ProvideItem::Uninitialized
1222 || i->state() == ProvideItem::Pending ) {
1223 _stats._expectedBytes += i->bytesExpected();
1224 continue;
1225 }
1226
1227 i->pulse();
1228
1229 const auto & stats = i->currentStats();
1230 const auto & prevStats = i->previousStats();
1231 if ( !stats || !prevStats ) {
1232 ERR << "Bug! Stats should be initialized by now" << std::endl;
1233 continue;
1234 }
1235
1236 if ( i->state() == ProvideItem::Downloading
1237 || i->state() == ProvideItem::Processing
1238 || i->state() == ProvideItem::Finalizing ) {
1239 _stats._expectedBytes += stats->_bytesExpected;
1240 tmpPartialBytes += stats->_bytesProvided;
1241 } else if ( i->state() == ProvideItem::Finished ) {
1242 _stats._finishedBytes += stats->_bytesProvided; // remember those bytes are finished in stats directly
1243 _stats._expectedBytes += stats->_bytesProvided;
1244 }
1245 }
1246
1247 const auto now = std::chrono::steady_clock::now();
1248 const auto sinceLast = std::chrono::duration_cast<std::chrono::seconds>( now - _stats._lastPulseTime );
1249 const auto lastFinB = lastPartialBytes + lastFinishedBytes;
1250 const auto currFinB = tmpPartialBytes + _stats._finishedBytes;
1251
1252 const auto diff = currFinB - lastFinB;
1253 _stats._lastPulseTime = now;
1254 _stats._partialBytes = tmpPartialBytes;
1255
1256 if ( sinceLast >= std::chrono::seconds(1) )
1257 _stats._perSecondSinceLastPulse = ( diff / ( sinceLast.count() ) );
1258
1259 auto sinceStart = std::chrono::duration_cast<std::chrono::seconds>( _stats._lastPulseTime - _stats._startTime );
1260 if ( sinceStart.count() ) {
1261 const size_t diff = _stats._finishedBytes + _stats._partialBytes;
1262 _stats._perSecond = zypp::ByteCount( diff / sinceStart.count() );
1263 }
1264 }
1265}
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition: AutoDispose.h:94
reference value() const
Reference to the Tp object.
Definition: AutoDispose.h:147
Store and operate with byte count.
Definition: ByteCount.h:31
Unit::ValueType SizeType
Definition: ByteCount.h:37
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:50
Base class for Exception.
Definition: Exception.h:146
Url manipulation class.
Definition: Url.h:92
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:533
void setAuthority(const std::string &authority)
Set the authority component in the URL.
Definition: Url.cc:698
void setPathName(const std::string &path, EEncoding eflag=zypp::url::E_DECODED)
Set the path name.
Definition: Url.cc:764
bool isValid() const
Verifies the Url.
Definition: Url.cc:489
void appendPathName(const Pathname &path_r, EEncoding eflag_r=zypp::url::E_DECODED)
Extend the path name.
Definition: Url.cc:786
void setScheme(const std::string &scheme)
Set the scheme name in the URL.
Definition: Url.cc:668
Wrapper class for stat/lstat.
Definition: PathInfo.h:221
const std::string & asString() const
String representation.
Definition: Pathname.h:91
bool empty() const
Test for an empty path.
Definition: Pathname.h:114
Pathname realpath() const
Returns this path as the absolute canonical pathname.
Definition: Pathname.cc:231
static bool openTray(const std::string &device_r)
Definition: cdtools.cc:33
Just inherits Exception to separate media exceptions.
static AttachMediaItemRef create(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request, ProvidePrivate &parent)
static ProvideFileItemRef create(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request, ProvidePrivate &parent)
Definition: provideitem.cc:564
virtual std::chrono::steady_clock::time_point startTime() const
Definition: provideitem.cc:132
virtual std::chrono::steady_clock::time_point finishedTime() const
Definition: provideitem.cc:137
State state() const
Definition: provideitem.cc:494
const std::optional< ItemStats > & currentStats() const
Definition: provideitem.cc:122
std::string handle() const
Definition: provide.cc:951
std::shared_ptr< Provide > parent() const
Definition: provide.cc:941
std::shared_ptr< Data > _ref
Definition: provide.h:56
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:835
void doSchedule(Timer &)
Definition: provide.cc:70
std::string effectiveScheme(const std::string &scheme) const
Definition: provide.cc:849
std::vector< AttachedMediaInfo > _attachedMediaInfos
Definition: provide_p.h:139
std::list< ProvideItemRef > _items
Definition: provide_p.h:129
ProvidePrivate(zypp::Pathname &&workDir, Provide &pub)
Definition: provide.cc:21
Timer::Ptr _scheduleTrigger
Definition: provide_p.h:126
std::unordered_map< std::string, ProvideQueueRef > _workerQueues
Definition: provide_p.h:141
void onPulseTimeout(Timer &)
Definition: provide.cc:858
std::deque< QueueItem > _queues
Definition: provide_p.h:136
zypp::Pathname _workDir
Definition: provide_p.h:127
void schedule(ScheduleReason reason)
Definition: provide.cc:38
expected< ProvideQueue::Config > schemeConfig(const std::string &scheme)
Definition: provide.cc:712
std::chrono::time_point< std::chrono::steady_clock > TimePoint
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:57
const Config & workerConfig() const
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:92
static expected< ProvideRequestRef > createDetach(const zypp::Url &url)
Definition: provideitem.cc:75
A ProvideRes object is a reference counted ownership of a resource in the cache provided by a Provide...
Definition: provideres.h:36
virtual void provideStart()
Definition: provide.cc:1174
const Stats & stats() const
Definition: provide.cc:1199
virtual void pulse()
Definition: provide.cc:1204
ProvideWeakRef _provider
Definition: provide.h:97
ProvideStatus(ProvideRef parent)
Definition: provide.cc:1170
virtual void itemFailed(ProvideItem &item)
Definition: provide.cc:1194
virtual void itemDone(ProvideItem &item)
Definition: provide.cc:1181
AsyncOpRef< expected< ProvideRes > > provide(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request)
Definition: provide.cc:1038
const zypp::media::CredManagerOptions & credManangerOptions() const
Definition: provide.cc:1142
SignalProxy< std::optional< zypp::media::AuthData >(const zypp::Url &reqUrl, const std::string &triedUsername, const std::map< std::string, std::string > &extraValues) > sigAuthRequired()
Definition: provide.cc:1163
static ProvideRef create(const zypp::Pathname &workDir="")
Definition: provide.cc:963
AsyncOpRef< expected< MediaHandle > > attachMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
Definition: provide.cc:973
void setWorkerPath(const zypp::Pathname &path)
Definition: provide.cc:1119
SignalProxy< void()> sigIdle()
Definition: provide.cc:1153
void setCredManagerOptions(const zypp::media::CredManagerOptions &opt)
Definition: provide.cc:1148
Provide(const zypp::Pathname &workDir)
Definition: provide.cc:957
void setStatusTracker(ProvideStatusRef tracker)
Definition: provide.cc:1132
bool ejectDevice(const std::string &queueRef, const std::string &device)
Definition: provide.cc:1124
AsyncOpRef< expected< std::string > > checksumForFile(const zypp::Pathname &p, const std::string &algorithm)
Definition: provide.cc:1082
AsyncOpRef< expected< zypp::ManagedFile > > copyFile(const zypp::Pathname &source, const zypp::Pathname &target)
Definition: provide.cc:1097
SignalProxy< MediaChangeAction(const std::string &queueRef, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)> sigMediaChangeRequested()
Definition: provide.cc:1158
std::optional< Action > MediaChangeAction
Definition: provide.h:149
const zypp::Pathname & providerWorkdir() const
Definition: provide.cc:1137
ProvideMediaHandle MediaHandle
Definition: provide.h:109
void releaseMedia(const std::string &mediaRef)
Definition: provide.cc:1021
Definition: Arch.h:361
String related utilities and Regular expression matching.
int unlink(const Pathname &path)
Like 'unlink'.
Definition: PathInfo.cc:700
const std::string & asString(const std::string &t)
Global asString() that works with std::string too.
Definition: String.h:139
bool startsWith(const C_Str &str_r, const C_Str &prefix_r)
alias for hasPrefix
Definition: String.h:1085
std::string stripSuffix(const C_Str &str_r, const C_Str &suffix_r)
Strip a suffix_r from str_r and return the resulting string.
Definition: String.h:1048
Easy-to use interface to the ZYPP dependency resolver.
Definition: CodePitfalls.doc:2
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
Definition: ManagedFile.h:27
constexpr auto DEFAULT_ACTIVE_CONN_PER_HOST
Definition: provide_p.h:39
constexpr auto DEFAULT_ACTIVE_CONN
Definition: provide_p.h:40
constexpr auto DEFAULT_MAX_DYNAMIC_WORKERS
Definition: provide_p.h:41
constexpr std::string_view ATTACHED_MEDIA_SUFFIX
Definition: provide_p.h:38
bool provideDebugEnabled()
Definition: providedbg_p.h:28
ZYPP_IMPL_PRIVATE(Provide)
#define DBG_PRV
Definition: providedbg_p.h:34
#define MIL_PRV
Definition: providedbg_p.h:35
Convenient building of std::string with boost::format.
Definition: String.h:253
Data(Provide &parent, const std::string &hdl)
Definition: provide.cc:925
zypp::ByteCount _partialBytes
Definition: provide.h:74
zypp::ByteCount _perSecondSinceLastPulse
Definition: provide.h:75
zypp::ByteCount _perSecond
Definition: provide.h:76
zypp::ByteCount _expectedBytes
Definition: provide.h:73
std::chrono::steady_clock::time_point _startTime
Definition: provide.h:68
zypp::ByteCount _finishedBytes
Definition: provide.h:72
std::chrono::steady_clock::time_point _lastPulseTime
Definition: provide.h:69
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:432
#define DBG
Definition: Logger.h:95
#define MIL
Definition: Logger.h:96
#define ERR
Definition: Logger.h:98
#define WAR
Definition: Logger.h:97
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition: Logger.h:113