16#include <zypp-core/fs/PathInfo.h>
17#include <zypp-core/zyppng/rpc/MessageStream>
18#include <zypp-core/base/StringV.h>
20#include <zypp-media/MediaException>
21#include <zypp-media/auth/CredentialManager>
31 return (
_request->code () == ProvideMessage::Code::Attach );
36 return ( _request->code () == ProvideMessage::Code::Provide );
41 return ( _request->code () == ProvideMessage::Code::Detach );
50 if ( this->
_activeItems.size() || this->_waitQueue.size() ) {
51 DBG <<
"Queue shutdown with Items still running" << std::endl;
60 ERR <<
"Queue Worker was already initialized" << std::endl;
67 MIL <<
"Trying to start " << pN << std::endl;
69 if ( !pi.isExist() ) {
70 ERR <<
"Failed to find worker for " << workerScheme << std::endl;
74 if ( !pi.userMayX() ) {
75 ERR <<
"Failed to start worker for " << workerScheme <<
" binary " << pi.asString() <<
" is not executable." << std::endl;
80 ERR <<
"Failed to assert working directory '" << workDir <<
"' for worker " << workerScheme << std::endl;
97 request->setCurrentQueue( shared_this<ProvideQueue>() );
105 const auto &isSameItem = [item](
const Item &i ){
106 if ( i.isDetachRequest () )
108 return i._request.get() == item;
114 if ( item->
code() != ProvideMessage::Code::Attach
115 && item->
code() != ProvideMessage::Code::Provide ) {
116 ERR <<
"Can not cancel a " << item->
code() <<
" request!" << std::endl;
121 auto &reqRef = i->_request;
122 reqRef->setCurrentQueue(
nullptr);
123 if ( reqRef->owner() )
124 reqRef->owner()->finishReq(
this, reqRef, error );
138 it->_request->setCurrentQueue(
nullptr );
157 auto &reqRef = item._request;
158 if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
159 reqRef->owner()->finishReq(
this, reqRef, reason );
164 auto &reqRef = i->_request;
165 if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
182 auto &reqRef = i->_request;
190 reqRef->setCurrentQueue(
nullptr);
191 if ( reqRef->owner() )
192 reqRef->owner()->finishReq(
this, reqRef, error );
199 ERR <<
"Failed to send cancel message to worker" << std::endl;
202 reqRef->setCurrentQueue(
nullptr);
203 if ( reqRef->owner() )
204 reqRef->owner()->finishReq(
this, reqRef, error );
218 auto &reqRef = item._request;
219 if ( !reqRef->activeUrl() ) {
220 ERR <<
"Item without active URL enqueued, this is a BUG." << std::endl;
221 if ( reqRef->owner() )
226 if ( !
_messageStream->sendMessage( reqRef->provideMessage().impl() ) ) {
227 ERR <<
"Failed to send message to worker process." << std::endl;
240 _idleSince = std::chrono::steady_clock::now();
247 return (
_activeItems.size() == 0 || (
_capabilities.cfg_flags () & zypp::proto::Capabilities::Pipeline ) == zypp::proto::Capabilities::Pipeline );
279 if ( i.isDetachRequest () )
282 auto &reqRef = i._request;
283 if ( reqRef->code() != ProvideMessage::Code::Provide )
288 if ( i.isDetachRequest () )
290 auto &reqRef = i._request;
291 if ( reqRef->code() != ProvideMessage::Code::Provide )
321 ERR <<
"Failed to execute worker" << std::endl;
334 zypp::proto::Configuration conf;
338 conf.mutable_values ()->insert ( {
PROVIDER_ROOT.data (),
_parent.z_func()->providerWorkdir().asString() } );
340 const auto &cleanupOnErr = [&](){
349 ERR <<
"Failed to send initial message to queue worker" << std::endl;
350 return cleanupOnErr();
358 if ( !caps || caps->messagetypename() != rpc::messageTypeName<zypp::proto::Capabilities>() ) {
359 ERR <<
"Worker did not sent a capabilities message, aborting" << std::endl;
360 return cleanupOnErr();
364 auto p =
_messageStream->parseMessage<zypp::proto::Capabilities>( *caps );
366 return cleanupOnErr();
385 const auto &getRequest = [&](
const auto &exp ) ->
decltype(
_activeItems)::iterator {
387 ERR <<
"Ignoring invalid request!" << std::endl;
392 return exp->requestId() == elem._request->provideMessage().requestId();
396 ERR <<
"Ignoring unknown request ID: " << exp->requestId() << std::endl;
403 const auto &sendErrorToWorker = [&](
const uint32_t reqId,
const uint code,
const std::string &reason,
bool transient = false ) {
406 ERR <<
"Failed to send Error message to worker process." << std::endl;
413 const bool doesDownload = this->
_capabilities.worker_type() == Config::Downloading;
414 const bool fileNeedsCleanup = doesDownload || (
_capabilities.worker_type() == Config::CPUBound &&
_capabilities.cfg_flags() & Config::FileArtifacts );
418 if ( msg->messagetypename() == rpc::messageTypeName<zypp::proto::ProvideMessage>() ) {
426 const auto &reqIter = getRequest( provMsg );
428 if ( provMsg->code() == ProvideMessage::Code::ProvideFinished && fileNeedsCleanup ) {
431 MIL <<
"Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
438 auto &req = *reqIter;
439 auto &reqRef =req._request;
441 const auto code = provMsg->code();
443 if ( code >= ProvideMessage::Code::FirstInformalCode && code <= ProvideMessage::Code::LastInformalCode ) {
446 if ( reqRef && reqRef->owner() )
447 reqRef->owner()->informalMessage ( *
this, reqRef, *provMsg );
450 }
else if ( code >= ProvideMessage::Code::FirstSuccessCode && code <= ProvideMessage::Code::LastSuccessCode ) {
458 if ( code == ProvideMessage::Code::ProvideFinished ) {
464 std::optional<zypp::ManagedFile> dataRef;
466 if ( !reqIter->isFileRequest() ) {
467 ERR <<
"Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
473 if ( doesDownload ) {
478 MIL <<
"CACHE MISS, file " << locFName <<
" was already removed, queueing again" << std::endl;
479 if ( reqRef->owner() )
480 reqRef->owner()->cacheMiss( reqRef );
481 reqRef->provideMessage().setRequestId(
InvalidId );
493 reqRef->setCurrentQueue(
nullptr);
494 auto resp =
ProvideMessage::createErrorResponse ( provMsg->requestId(), ProvideMessage::Code::InternalError,
"File vanished between downloading and adding it to cache." );
495 if ( reqRef->owner() )
496 reqRef->owner()->finishReq( *
this, reqRef, resp );
505 reqRef->setCurrentQueue(
nullptr);
506 if ( reqRef->owner() )
507 reqRef->owner()->finishReq( *
this, reqRef, *provMsg );
512 }
else if ( code >= ProvideMessage::Code::FirstClientErrCode && code <= ProvideMessage::Code::LastSrvErrCode ) {
521 reqRef->setCurrentQueue(
nullptr);
523 if ( reqRef->owner() )
524 reqRef->owner()->finishReq( *
this, reqRef, *provMsg );
530 }
else if ( code >= ProvideMessage::Code::FirstRedirCode && code <= ProvideMessage::Code::LastRedirCode ) {
540 reqRef->setCurrentQueue(
nullptr);
541 if ( reqRef->owner() )
542 reqRef->owner()->finishReq( *
this, reqRef, *provMsg );
547 }
else if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
549 ERR <<
"Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
553 }
else if ( code >= ProvideMessage::Code::FirstWorkerCode && code <= ProvideMessage::Code::LastWorkerCode ) {
555 if ( code == ProvideMessage::Code::AuthDataRequest ) {
556 if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
557 ERR <<
"Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
564 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Item was cancelled") )
570 if ( !reqRef->owner() ) {
571 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Request has no owner" ) )
576 if ( !reqRef->activeUrl() ) {
577 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Item has no active URL, this is a bug." ) )
585 std::map<std::string, std::string> extraVals;
593 WAR <<
"Ignoring non string value for " << name << std::endl;
603 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"No auth given by user." ) )
608 auto r =
ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
610 ERR <<
"Failed to send AuthorizationInfo to worker process." << std::endl;
618 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, e.
asString() ) )
623 }
else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
625 if ( !reqIter->isAttachRequest() ) {
626 ERR <<
"Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
633 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort,
"Item was cancelled" ) )
638 MIL <<
"Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
641 std::vector<std::string> freeDevs;
643 freeDevs.push_back( val.asString() );
646 std::optional<std::string> desc;
648 if ( descVal.valid () && descVal.isString() )
649 desc = descVal.asString();
662 MIL <<
"Sending back a MediaChanged message, retrying to find medium " << std::endl;
665 ERR <<
"Failed to send MediaChanged to worker process." << std::endl;
672 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
673 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort,
"Cancelled by User" ) )
678 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
679 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip,
"Skipped by User" ) )
686 ERR <<
"Unsupported worker request: "<<code<<
", this is a fatal error!" << std::endl;
693 ERR <<
"Received unsupported message " << msg->messagetypename() <<
" with code " << code <<
" ignoring! " << std::endl;
697 ERR <<
"Received unsupported message " << msg->messagetypename() <<
"ignoring" << std::endl;
709 auto ba =
_workerProc->channelReadLine(Process::StdErr);
710 while ( !ba.empty() ) {
712 ba =
_workerProc->channelReadLine(Process::StdErr);
718 if ( (
_capabilities.cfg_flags () & zypp::proto::Capabilities::ZyppLogFormat) == zypp::proto::Capabilities::ZyppLogFormat )
721 MIL <<
"Message from worker: " <<
_capabilities.worker_name() <<
":" << logLine << std::endl;
726 if ( channel == Process::StdOut )
731 while (
_workerProc->canReadLine(Process::StdErr) ) {
732 const auto &data =
_workerProc->channelReadLine( Process::StdErr );
762 MIL <<
"Unexpected queue worker exit with code: " << exitCode << std::endl;
Provides API related macros.
#define LIBZYPP_VERSION_STRING
Store and operate with byte count.
Base class for Exception.
std::string asString() const
Error message provided by dumpOn as string.
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
static LogControl instance()
Singleton access.
Wrapper class for stat/lstat.
const char * c_str() const
String representation.
const std::string & asString() const
String representation.
bool empty() const
Test for an empty path.
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createCancel(const uint32_t reqId)
static ProvideMessage createMediaChanged(const uint32_t reqId)
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
const std::string queueName(ProvideQueue &q) const
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
bool isInCache(const zypp::Pathname &downloadedFile) const
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) > _sigMediaChange
void schedule(ScheduleReason reason)
const zypp::Pathname & workerPath() const
RpcMessageStreamPtr _messageStream
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
zypp::ByteCount expectedProvideSize() const
ProvideQueue(ProvidePrivate &parent)
void immediateShutdown(const std::exception_ptr &reason)
std::deque< Item > _waitQueue
void cancel(ProvideRequest *item, std::exception_ptr error)
uint requestCount() const
zypp::Pathname _currentExe
bool canScheduleMore() const
zypp::proto::Capabilities Config
void processReadyRead(int channel)
const std::string & hostname() const
std::optional< TimePoint > _idleSince
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
const Config & workerConfig() const
std::optional< TimePoint > idleSince() const
void procFinished(int exitCode)
uint activeRequests() const
std::list< Item > _activeItems
void forwardToLog(std::string &&logLine)
void enqueue(ProvideRequestRef request)
SignalProxy< void()> sigIdle()
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
static constexpr uint32_t InvalidId
int unlink(const Pathname &path)
Like 'unlink'.
int assert_dir(const Pathname &path, unsigned mode)
Like 'mkdir -p'.
constexpr std::string_view EffectiveUrl("effective_url")
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
constexpr std::string_view LocalFilename("local_filename")
constexpr std::string_view CacheHit("cacheHit")
constexpr std::string_view ExpectedFilesize("expected_filesize")
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
bool provideDebugEnabled()
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
bool isDetachRequest() const
ProvideRequestRef _request
bool isAttachRequest() const
bool isFileRequest() const
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.