libzypp 17.31.23
providequeue.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8\---------------------------------------------------------------------*/
9
12#include "private/provide_p.h"
15
16#include <zypp-core/fs/PathInfo.h>
17#include <zypp-core/zyppng/rpc/MessageStream>
18#include <zypp-core/base/StringV.h>
20#include <zypp-media/MediaException>
21#include <zypp-media/auth/CredentialManager>
22
23#include <zypp/APIConfig.h>
24#include <variant>
25#include <bitset>
26
27namespace zyppng {
28
30 {
31 return ( _request->code () == ProvideMessage::Code::Attach );
32 }
33
35 {
36 return ( _request->code () == ProvideMessage::Code::Provide );
37 }
38
40 {
41 return ( _request->code () == ProvideMessage::Code::Detach );
42 }
43
45 { }
46
48 {
50 if ( this->_activeItems.size() || this->_waitQueue.size() ) {
51 DBG << "Queue shutdown with Items still running" << std::endl;
52 }
53 }
54 immediateShutdown(std::make_exception_ptr(zypp::media::MediaException("Cancelled by queue shutdown")));
55 }
56
57 bool ProvideQueue::startup(const std::string &workerScheme, const zypp::filesystem::Pathname &workDir, const std::string &hostname ) {
58
59 if ( _workerProc ) {
60 ERR << "Queue Worker was already initialized" << std::endl;
61 return true;
62 }
63
65
66 const auto &pN = _parent.workerPath() / ( "zypp-media-"+workerScheme ) ;
67 MIL << "Trying to start " << pN << std::endl;
68 const auto &pi = zypp::PathInfo( pN );
69 if ( !pi.isExist() ) {
70 ERR << "Failed to find worker for " << workerScheme << std::endl;
71 return false;
72 }
73
74 if ( !pi.userMayX() ) {
75 ERR << "Failed to start worker for " << workerScheme << " binary " << pi.asString() << " is not executable." << std::endl;
76 return false;
77 }
78
79 if ( zypp::filesystem::assert_dir( workDir ) != 0 ) {
80 ERR << "Failed to assert working directory '" << workDir << "' for worker " << workerScheme << std::endl;
81 return false;
82 }
83
84 _currentExe = pN;
85 _workerProc = Process::create();
86 _workerProc->setWorkingDirectory ( workDir );
87 _messageStream = RpcMessageStream::create( _workerProc );
88 return doStartup();
89 }
90
91
92 void ProvideQueue::enqueue( ProvideRequestRef request )
93 {
94 Item i;
95 i._request = request;
96 i._request->provideMessage().setRequestId( nextRequestId() );
97 request->setCurrentQueue( shared_this<ProvideQueue>() );
98 _waitQueue.push_back( std::move(i) );
99 if ( _parent.isRunning() )
100 scheduleNext();
101 }
102
103 void ProvideQueue::cancel( ProvideRequest *item , std::exception_ptr error )
104 {
105 const auto &isSameItem = [item]( const Item &i ){
106 if ( i.isDetachRequest () )
107 return false;
108 return i._request.get() == item;
109 };
110
111 if ( !item )
112 return;
113
114 if ( item->code() != ProvideMessage::Code::Attach
115 && item->code() != ProvideMessage::Code::Provide ) {
116 ERR << "Can not cancel a " << item->code() << " request!" << std::endl;
117 return;
118 }
119
120 if ( auto i = std::find_if( _waitQueue.begin(), _waitQueue.end(), isSameItem ); i != _waitQueue.end() ) {
121 auto &reqRef = i->_request;
122 reqRef->setCurrentQueue(nullptr);
123 if ( reqRef->owner() )
124 reqRef->owner()->finishReq( this, reqRef, error );
125 _waitQueue.erase(i);
126 _parent.schedule( ProvidePrivate::FinishReq ); // let the parent scheduler run since we have a open spot now
127 } else if ( auto i = std::find_if( _activeItems.begin(), _activeItems.end(), isSameItem ); i != _activeItems.end() ) {
128 cancelActiveItem(i, error);
129 }
130 }
131
132 std::list<ProvideQueue::Item>::iterator ProvideQueue::dequeueActive( std::list<Item>::iterator it )
133 {
134 if ( it == _activeItems.end() )
135 return it;
136
137 if ( it->_request )
138 it->_request->setCurrentQueue( nullptr );
139
140 auto i = _activeItems.erase(it);
141 _parent.schedule ( ProvidePrivate::FinishReq ); // Trigger the scheduler
142 scheduleNext (); // keep the active items full
143 return i;
144 }
145
146 void ProvideQueue::fatalWorkerError( const std::exception_ptr &reason )
147 {
148 immediateShutdown( reason ? reason : std::make_exception_ptr( zypp::media::MediaException("Fatal worker error")) );
149 }
150
151 void ProvideQueue::immediateShutdown( const std::exception_ptr &reason )
152 {
153 _queueShuttingDown = true;
154
155 while ( _waitQueue.size() ) {
156 auto &item = _waitQueue.front();
157 auto &reqRef = item._request;
158 if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
159 reqRef->owner()->finishReq( this, reqRef, reason );
160 _waitQueue.pop_front();
161 }
162
163 for ( auto i = _activeItems.begin(); i != _activeItems.end(); ) {
164 auto &reqRef = i->_request;
165 if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
166 i = cancelActiveItem(i, reason );
167 } else {
168 i++;
169 }
170 }
171
172 if ( _workerProc && _workerProc->isRunning() ) {
173 _workerProc->flush();
174 _workerProc->closeWriteChannel();
175 _workerProc->waitForExit();
177 }
178 }
179
180 std::list< ProvideQueue::Item >::iterator ProvideQueue::cancelActiveItem( std::list< Item >::iterator i , const std::__exception_ptr::exception_ptr &error )
181 {
182 auto &reqRef = i->_request;
183
184 // already in cancelling process or finished
185 if ( i->_state == Item::Cancelling || i->_state == Item::Finished )
186 return (++i);
187
188 // not possible but lets be safe
189 if ( i->_state == Item::Pending ) {
190 reqRef->setCurrentQueue(nullptr);
191 if ( reqRef->owner() )
192 reqRef->owner()->finishReq( this, reqRef, error );
193 return dequeueActive(i);
194 }
195
196 // we first need to cancel the item
197 auto c = ProvideMessage::createCancel ( i->_request->provideMessage().requestId() );
198 if( !_messageStream->sendMessage(c.impl()) )
199 ERR << "Failed to send cancel message to worker" << std::endl;
200
201 i->_state = Item::Cancelling;
202 reqRef->setCurrentQueue(nullptr);
203 if ( reqRef->owner() )
204 reqRef->owner()->finishReq( this, reqRef, error );
205 reqRef.reset();
206 return (++i);
207 }
208
210 {
211 if ( _queueShuttingDown )
212 return;
213
214 while ( _waitQueue.size() && canScheduleMore() ) {
215 auto item = std::move( _waitQueue.front() );
216 _waitQueue.pop_front();
217
218 auto &reqRef = item._request;
219 if ( !reqRef->activeUrl() ) {
220 ERR << "Item without active URL enqueued, this is a BUG." << std::endl;
221 if ( reqRef->owner() )
222 reqRef->owner()->finishReq( this, reqRef, ZYPP_EXCPT_PTR (zypp::media::MediaException("Item needs a activeURL to be queued.")) );
223 continue;
224 }
225
226 if ( !_messageStream->sendMessage( reqRef->provideMessage().impl() ) ) {
227 ERR << "Failed to send message to worker process." << std::endl;
228 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
229 return;
230 }
231
232 item._state = Item::Queued;
233 _activeItems.push_back( std::move(item) );
234 _idleSince.reset();
235 }
236
237 if ( _waitQueue.empty() && _activeItems.empty() ) {
239 if ( !_idleSince )
240 _idleSince = std::chrono::steady_clock::now();
241 _sigIdle.emit();
242 }
243 }
244
246 {
247 return ( _activeItems.size() == 0 || ( _capabilities.cfg_flags () & zypp::proto::Capabilities::Pipeline ) == zypp::proto::Capabilities::Pipeline );
248 }
249
251 {
252 return ( empty() );
253 }
254
255 std::optional<ProvideQueue::TimePoint> ProvideQueue::idleSince() const
256 {
257 return _idleSince;
258 }
259
261 {
262 return ( _activeItems.empty() && _waitQueue.empty() );
263 }
264
266 {
267 return _activeItems.size() + _waitQueue.size();
268 }
269
271 {
272 return _activeItems.size();
273 }
274
276 {
277 zypp::ByteCount dlSize;
278 for ( const auto &i : _waitQueue ) {
279 if ( i.isDetachRequest () )
280 continue;
281
282 auto &reqRef = i._request;
283 if ( reqRef->code() != ProvideMessage::Code::Provide )
284 continue;
285 dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
286 }
287 for ( const auto &i : _activeItems ) {
288 if ( i.isDetachRequest () )
289 continue;
290 auto &reqRef = i._request;
291 if ( reqRef->code() != ProvideMessage::Code::Provide )
292 continue;
293 dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
294 }
295 return dlSize;
296 }
297
298 const std::string &ProvideQueue::hostname() const
299 {
300 return _myHostname;
301 }
302
304 {
305 return _capabilities;
306 }
307
308 SignalProxy<void ()> ProvideQueue::sigIdle()
309 {
310 return _sigIdle;
311 }
312
314 {
315 if ( _currentExe.empty() )
316 return false;
317
318 //const char *argv[] = { "gdbserver", ":10000", _currentExe.c_str(), nullptr };
319 const char *argv[] = { _currentExe.c_str(), nullptr };
320 if ( !_workerProc->start( argv) ) {
321 ERR << "Failed to execute worker" << std::endl;
322
323 _messageStream.reset ();
324 _workerProc.reset ();
325
326 return false;
327 }
328
329 // make sure the default read channel is StdOut so RpcMessageStream gets all the rpc messages
330 _workerProc->setReadChannel ( Process::StdOut );
331
332 // we are ready to send the data
333
334 zypp::proto::Configuration conf;
335 // @TODO actually write real config data :D
336 conf.mutable_values ()->insert ( { AGENT_STRING_CONF.data (), "ZYpp " LIBZYPP_VERSION_STRING } );
337 conf.mutable_values ()->insert ( { ATTACH_POINT.data (), _workerProc->workingDirectory().asString() } );
338 conf.mutable_values ()->insert ( { PROVIDER_ROOT.data (), _parent.z_func()->providerWorkdir().asString() } );
339
340 const auto &cleanupOnErr = [&](){
342 _messageStream.reset ();
343 _workerProc->close();
344 _workerProc.reset();
345 return false;
346 };
347
348 if ( !_messageStream->sendMessage( conf ) ) {
349 ERR << "Failed to send initial message to queue worker" << std::endl;
350 return cleanupOnErr();
351 }
352
353 // wait for the data to be written
354 _workerProc->flush ();
355
356 // wait until we receive a message
357 const auto &caps = _messageStream->nextMessageWait();
358 if ( !caps || caps->messagetypename() != rpc::messageTypeName<zypp::proto::Capabilities>() ) {
359 ERR << "Worker did not sent a capabilities message, aborting" << std::endl;
360 return cleanupOnErr();
361 }
362
363 {
364 auto p = _messageStream->parseMessage<zypp::proto::Capabilities>( *caps );
365 if ( !p )
366 return cleanupOnErr();
367
368 _capabilities = std::move(*p);
369 }
370
371 DBG << "Received config for worker: " << this->_currentExe.asString() << " Worker Type: " << this->_capabilities.worker_type() << " Flags: " << std::bitset<32>( _capabilities.cfg_flags() ).to_string() << std::endl;
372
373 // now we can set up signals and start processing messages
374 connect( *_messageStream, &RpcMessageStream::sigMessageReceived, *this, &ProvideQueue::processMessage );
375 connect( *_workerProc, &IODevice::sigChannelReadyRead, *this, &ProvideQueue::processReadyRead );
376 connect( *_workerProc, &Process::sigFinished, *this, &ProvideQueue::procFinished );
377
378 // make sure we do not miss messages
380 return true;
381 }
382
384
385 const auto &getRequest = [&]( const auto &exp ) -> decltype(_activeItems)::iterator {
386 if ( !exp ) {
387 ERR << "Ignoring invalid request!" << std::endl;
388 return _activeItems.end();
389 }
390
391 auto i = std::find_if( _activeItems.begin(), _activeItems.end(), [&]( const auto &elem ) {
392 return exp->requestId() == elem._request->provideMessage().requestId();
393 });
394
395 if ( i == _activeItems.end() ) {
396 ERR << "Ignoring unknown request ID: " << exp->requestId() << std::endl;
397 return _activeItems.end();
398 }
399
400 return i;
401 };
402
403 const auto &sendErrorToWorker = [&]( const uint32_t reqId, const uint code, const std::string &reason, bool transient = false ) {
404 auto r = ProvideMessage::createErrorResponse ( reqId, code, reason, transient );
405 if ( !_messageStream->sendMessage( r.impl() ) ) {
406 ERR << "Failed to send Error message to worker process." << std::endl;
407 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
408 return false;
409 }
410 return true;
411 };
412
413 const bool doesDownload = this->_capabilities.worker_type() == Config::Downloading;
414 const bool fileNeedsCleanup = doesDownload || ( _capabilities.worker_type() == Config::CPUBound && _capabilities.cfg_flags() & Config::FileArtifacts );
415
416 while ( auto msg = _messageStream->nextMessage () ) {
417
418 if ( msg->messagetypename() == rpc::messageTypeName<zypp::proto::ProvideMessage>() ) {
419
420 const auto &provMsg = ProvideMessage::create(*msg);
421 if ( !provMsg ) {
422 fatalWorkerError( provMsg.error() );
423 return;
424 }
425
426 const auto &reqIter = getRequest( provMsg );
427 if ( reqIter == _activeItems.end() ) {
428 if ( provMsg->code() == ProvideMessage::Code::ProvideFinished && fileNeedsCleanup ) {
429 const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
430 if ( !_parent.isInCache(locFName) ) {
431 MIL << "Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
432 zypp::filesystem::unlink( locFName );
433 }
434 }
435 continue;
436 }
437
438 auto &req = *reqIter;
439 auto &reqRef =req._request;
440
441 const auto code = provMsg->code();
442
443 if ( code >= ProvideMessage::Code::FirstInformalCode && code <= ProvideMessage::Code::LastInformalCode ) {
444
445 // send the message to the item but don't dequeue
446 if ( reqRef && reqRef->owner() )
447 reqRef->owner()->informalMessage ( *this, reqRef, *provMsg );
448 continue;
449
450 } else if ( code >= ProvideMessage::Code::FirstSuccessCode && code <= ProvideMessage::Code::LastSuccessCode ) {
451
452 if ( req._state == Item::Cancelling ) {
453 req._state = Item::Finished;
454 dequeueActive( reqIter );
455 continue;
456 }
457
458 if ( code == ProvideMessage::Code::ProvideFinished ) {
459
460 // we are going to register the file to the cache if this is a downloading worker, so it can not leak
461 // no matter if the item does the correct dance or not, this code is duplicated by all ProvideItems that receive ProvideFinished
462 // results that require file cleanups.
463 // we keep the ref around until after sending the result to the item. At that point it should take a reference
464 std::optional<zypp::ManagedFile> dataRef;
465
466 if ( !reqIter->isFileRequest() ) {
467 ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
469 return;
470 }
471
472 // when a worker is downloading we keep a internal book of cache files
473 if ( doesDownload ) {
474 const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
475 if ( provMsg->value( ProvideFinishedMsgFields::CacheHit, false ).asBool()) {
476 dataRef = _parent.addToFileCache ( locFName );
477 if ( !dataRef ) {
478 MIL << "CACHE MISS, file " << locFName << " was already removed, queueing again" << std::endl;
479 if ( reqRef->owner() )
480 reqRef->owner()->cacheMiss( reqRef );
481 reqRef->provideMessage().setRequestId( InvalidId );
482 req._state = Item::Pending;
483 _waitQueue.push_front( req );
484 dequeueActive( reqIter );
485 continue;
486 }
487 } else {
488 dataRef = _parent.addToFileCache ( locFName );
489
490 // unlikely this can happen but better be safe than sorry
491 if ( !dataRef ) {
492 req._state = Item::Finished;
493 reqRef->setCurrentQueue(nullptr);
494 auto resp = ProvideMessage::createErrorResponse ( provMsg->requestId(), ProvideMessage::Code::InternalError, "File vanished between downloading and adding it to cache." );
495 if ( reqRef->owner() )
496 reqRef->owner()->finishReq( *this, reqRef, resp );
497 dequeueActive( reqIter );
498 continue;
499 }
500 }
501 }
502 }
503
504 // send the message to the item and dequeue
505 reqRef->setCurrentQueue(nullptr);
506 if ( reqRef->owner() )
507 reqRef->owner()->finishReq( *this, reqRef, *provMsg );
508 req._state = Item::Finished;
509 dequeueActive( reqIter );
510 continue;
511
512 } else if ( code >= ProvideMessage::Code::FirstClientErrCode && code <= ProvideMessage::Code::LastSrvErrCode ) {
513
514 if ( req._state == Item::Cancelling ) {
515 req._state = Item::Finished;
516 dequeueActive( reqIter );
517 continue;
518 }
519
520 // send the message to the item and dequeue
521 reqRef->setCurrentQueue(nullptr);
522
523 if ( reqRef->owner() )
524 reqRef->owner()->finishReq( *this, reqRef, *provMsg );
525
526 req._state = Item::Finished;
527 dequeueActive( reqIter );
528 continue;
529
530 } else if ( code >= ProvideMessage::Code::FirstRedirCode && code <= ProvideMessage::Code::LastRedirCode ) {
531
532 // redir is like a finished message, we can simply forgot about a cancelling request
533 if ( req._state == Item::Cancelling ) {
534 req._state = Item::Finished;
535 dequeueActive( reqIter );
536 continue;
537 }
538
539 // send the message to the item and dequeue
540 reqRef->setCurrentQueue(nullptr);
541 if ( reqRef->owner() )
542 reqRef->owner()->finishReq( *this, reqRef, *provMsg );
543 req._state = Item::Finished;
544 dequeueActive( reqIter );
545 continue;
546
547 } else if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
548
549 ERR << "Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
550 fatalWorkerError ( ZYPP_EXCPT_PTR( zypp::media::MediaException("Controller message received from worker.") ) );
551 return;
552
553 } else if ( code >= ProvideMessage::Code::FirstWorkerCode && code <= ProvideMessage::Code::LastWorkerCode ) {
554
555 if ( code == ProvideMessage::Code::AuthDataRequest ) {
556 if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
557 ERR << "Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
559 return;
560 }
561
562 // if the file was cancelled we send a failure back
563 if( reqIter->_state == Item::Cancelling ) {
564 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item was cancelled") )
565 return;
566 continue;
567 }
568
569 // we need a owner item to fetch the auth data for us
570 if ( !reqRef->owner() ) {
571 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Request has no owner" ) )
572 return;
573 continue;
574 }
575
576 if ( !reqRef->activeUrl() ) {
577 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item has no active URL, this is a bug." ) )
578 return;
579 continue;
580 }
581
582 try {
583 zypp::Url u( provMsg->value( AuthDataRequestMsgFields::EffectiveUrl ).asString() );
584
585 std::map<std::string, std::string> extraVals;
586 provMsg->forEachVal( [&]( const std::string &name, const zyppng::ProvideMessage::FieldVal &val ) {
587
590 return true;
591
592 if ( !val.isString() ) {
593 WAR << "Ignoring non string value for " << name << std::endl;
594 return true;
595 }
596
597 extraVals[name] = val.asString();
598 return true;
599 });
600
601 const auto &authOpt = reqRef->owner()->authenticationRequired( *this, reqRef, u, provMsg->value( AuthDataRequestMsgFields::LastAuthTimestamp ).asInt64(), extraVals );
602 if ( !authOpt ) {
603 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "No auth given by user." ) )
604 return;
605 continue;
606 }
607
608 auto r = ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
609 if ( !_messageStream->sendMessage( r.impl() ) ) {
610 ERR << "Failed to send AuthorizationInfo to worker process." << std::endl;
611 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
612 return;
613 }
614 continue;
615
616 } catch ( const zypp::Exception &e ) {
617 ZYPP_CAUGHT(e);
618 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, e.asString() ) )
619 return;
620 continue;
621 }
622
623 } else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
624
625 if ( !reqIter->isAttachRequest() ) {
626 ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
628 return;
629 }
630
631 // if the file was cancelled we send a failure back
632 if( reqIter->_state == Item::Cancelling ) {
633 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Item was cancelled" ) )
634 return;
635 continue;
636 }
637
638 MIL << "Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
639
640 //const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc
641 std::vector<std::string> freeDevs;
642 for ( const auto &val : provMsg->values( MediaChangeRequestMsgFields::Device) ) {
643 freeDevs.push_back( val.asString() );
644 }
645
646 std::optional<std::string> desc;
647 const auto &descVal = provMsg->value( MediaChangeRequestMsgFields::Desc );
648 if ( descVal.valid () && descVal.isString() )
649 desc = descVal.asString();
650
651 auto res = _parent._sigMediaChange.emit(
652 _parent.queueName(*this),
653 provMsg->value( MediaChangeRequestMsgFields::Label ).asString(),
654 provMsg->value( MediaChangeRequestMsgFields::MediaNr ).asInt(),
655 freeDevs,
656 desc
657 );
658
659 auto action = res ? *res : Provide::Action::ABORT;
660 switch ( action ) {
662 MIL << "Sending back a MediaChanged message, retrying to find medium " << std::endl;
663 auto r = ProvideMessage::createMediaChanged ( reqIter->_request->provideMessage().requestId() );
664 if ( !_messageStream->sendMessage( r.impl() ) ){
665 ERR << "Failed to send MediaChanged to worker process." << std::endl;
666 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
667 return;
668 }
669 continue;
670 }
672 MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
673 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Cancelled by User" ) )
674 return;
675 continue;
676 }
678 MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
679 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip, "Skipped by User" ) )
680 return;
681 continue;
682 }
683 }
684 } else {
685 // if there is a unsupported worker request we need to stop immediately because the worker will be blocked until it gets a answer
686 ERR << "Unsupported worker request: "<<code<<", this is a fatal error!" << std::endl;
688 return;
689 }
690
691 } else {
692 // unknown code
693 ERR << "Received unsupported message " << msg->messagetypename() << " with code " << code << " ignoring! " << std::endl;
694 }
695
696 } else {
697 ERR << "Received unsupported message " << msg->messagetypename() << "ignoring" << std::endl;
698 }
699 }
700 }
701
707 {
708 // read all stderr data so we get the full logs
709 auto ba = _workerProc->channelReadLine(Process::StdErr);
710 while ( !ba.empty() ) {
711 forwardToLog(std::string( ba.data(), ba.size() ) );
712 ba = _workerProc->channelReadLine(Process::StdErr);
713 }
714 }
715
716 void ProvideQueue::forwardToLog( std::string &&logLine )
717 {
718 if ( (_capabilities.cfg_flags () & zypp::proto::Capabilities::ZyppLogFormat) == zypp::proto::Capabilities::ZyppLogFormat )
719 zypp::base::LogControl::instance ().logRawLine( std::move(logLine) );
720 else
721 MIL << "Message from worker: " << _capabilities.worker_name() << ":" << logLine << std::endl;
722 }
723
725 // ignore stdout here
726 if ( channel == Process::StdOut )
727 return;
728
729 // forward the stderr output to the log bypassing the formatter
730 // the worker already formatted the line
731 while ( _workerProc->canReadLine(Process::StdErr) ) {
732 const auto &data = _workerProc->channelReadLine( Process::StdErr );
733 if ( data.empty() )
734 return;
735
736 forwardToLog(std::string( data.data(), data.size() ) );
737 }
738 }
739
740 void ProvideQueue::procFinished(int exitCode)
741 {
742 // process all pending messages
744
745 // get all of the log lines
747
748 // shut down
749 // @todo implement worker restart in case of a unexpected exit
750 if ( !_queueShuttingDown )
751 immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
752
753#if 0
754 if ( !_queueShuttingDown ) {
755
757 if ( _crashCounter > 3 ) {
758 immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
759 return;
760 }
761
762 MIL << "Unexpected queue worker exit with code: " << exitCode << std::endl;
763 // try to spawn the worker again, move active items back to wait list and start over
764
765 if ( !doStartup () ) {
766
767 }
768 }
769#endif
770 }
771
773 return _parent.nextRequestId();
774 }
775}
Provides API related macros.
#define LIBZYPP_VERSION_STRING
Definition: APIConfig.h:15
Store and operate with byte count.
Definition: ByteCount.h:31
Base class for Exception.
Definition: Exception.h:146
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:75
Url manipulation class.
Definition: Url.h:92
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
Definition: LogControl.cc:912
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
Wrapper class for stat/lstat.
Definition: PathInfo.h:221
const char * c_str() const
String representation.
Definition: Pathname.h:110
const std::string & asString() const
String representation.
Definition: Pathname.h:91
bool empty() const
Test for an empty path.
Definition: Pathname.h:114
Just inherits Exception to separate media exceptions.
const std::string & asString() const
bool isString() const
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createCancel(const uint32_t reqId)
static ProvideMessage createMediaChanged(const uint32_t reqId)
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:835
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition: provide.cc:727
bool isRunning() const
Definition: provide.cc:844
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition: provide.cc:747
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) > _sigMediaChange
Definition: provide_p.h:104
void schedule(ScheduleReason reason)
Definition: provide.cc:38
uint32_t nextRequestId()
Definition: provide.cc:917
const zypp::Pathname & workerPath() const
Definition: provide.cc:830
RpcMessageStreamPtr _messageStream
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
zypp::ByteCount expectedProvideSize() const
Process::Ptr _workerProc
ProvideQueue(ProvidePrivate &parent)
Definition: providequeue.cc:44
void immediateShutdown(const std::exception_ptr &reason)
std::deque< Item > _waitQueue
void cancel(ProvideRequest *item, std::exception_ptr error)
uint requestCount() const
zypp::Pathname _currentExe
Signal< void()> _sigIdle
bool canScheduleMore() const
zypp::proto::Capabilities Config
void processReadyRead(int channel)
const std::string & hostname() const
std::optional< TimePoint > _idleSince
ProvidePrivate & _parent
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:57
const Config & workerConfig() const
std::optional< TimePoint > idleSince() const
uint32_t nextRequestId()
void procFinished(int exitCode)
uint activeRequests() const
std::list< Item > _activeItems
void forwardToLog(std::string &&logLine)
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:92
SignalProxy< void()> sigIdle()
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
static constexpr uint32_t InvalidId
int unlink(const Pathname &path)
Like 'unlink'.
Definition: PathInfo.cc:700
int assert_dir(const Pathname &path, unsigned mode)
Like 'mkdir -p'.
Definition: PathInfo.cc:319
constexpr std::string_view EffectiveUrl("effective_url")
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
constexpr std::string_view Label("label")
constexpr std::string_view Desc("desc")
constexpr std::string_view MediaNr("media_nr")
constexpr std::string_view Device("device")
constexpr std::string_view LocalFilename("local_filename")
constexpr std::string_view CacheHit("cacheHit")
constexpr std::string_view ExpectedFilesize("expected_filesize")
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
bool provideDebugEnabled()
Definition: providedbg_p.h:28
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
ProvideRequestRef _request
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition: Exception.h:436
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:432
#define DBG
Definition: Logger.h:95
#define MIL
Definition: Logger.h:96
#define ERR
Definition: Logger.h:98