libzypp 17.31.23
provideworker.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8\---------------------------------------------------------------------*/
9
10#include "provideworker.h"
11#include <zypp-core/base/DtorReset>
12#include <zypp-core/AutoDispose.h>
13#include <zypp-core/Url.h>
14#include <zypp-core/Date.h>
15#include <zypp-core/zyppng/pipelines/AsyncResult>
16#include <zypp-core/base/LogControl.h>
17#include <zypp-core/fs/PathInfo.h>
18#include <zypp-core/fs/TmpPath.h>
19#include <zypp-core/zyppng/base/private/threaddata_p.h>
20#include <zypp-core/zyppng/base/AutoDisconnect>
21#include <zypp-core/zyppng/base/EventDispatcher>
22#include <zypp-media/MediaConfig>
23#include <ostream>
24#include <fstream>
25
27
28#undef ZYPP_BASE_LOGGER_LOGGROUP
29#define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
30
31namespace zyppng::worker {
32
33 using namespace zyppng::operators;
34
35 RequestCancelException::RequestCancelException() : zypp::media::MediaException ("Request was cancelled")
36 { }
37
38 ProvideWorker::ProvideWorker(std::string_view workerName) : _workerName(workerName)
39 {
40 // do not change the order of these calls, otherwise showing the threadname does not work
41 // enableLogForwardingMode will initialize the log which would override the current thread name
43 ThreadData::current().setName( workerName );
44
45 // we use a singleshot timer that triggers message handling
46 connect( *_msgAvail, &Timer::sigExpired, *this, &ProvideWorker::messageLoop );
47 _msgAvail->setSingleShot(true);
48
49 // another timer to trigger a delayed shutdown
50 connectFunc( *_delayedShutdown, &Timer::sigExpired, [this]( zyppng::Timer & ) {
52 }, *this );
53 _delayedShutdown->setSingleShot(true);
54 }
55
57 { }
58
59 RpcMessageStream::Ptr ProvideWorker::messageStream() const
60 {
61 return _stream;
62 }
63
64 expected<void> ProvideWorker::run( int recv, int send )
65 {
66 // reentry not supported
67 assert ( !_isRunning );
68
70 _isRunning = true;
71
72 initLog();
73
74 zypp::OnScopeExit cleanup([&](){
75 _stream.reset();
76 _controlIO.reset();
77 _loop.reset();
78 });
79
80 _controlIO = AsyncDataSource::create();
81 if ( !_controlIO->openFds( { recv }, send ) ) {
82 return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to open control FDs")) );
83 }
84
85 connect( *_controlIO, &AsyncDataSource::sigReadFdClosed, *this, &ProvideWorker::readFdClosed );
86 connect( *_controlIO, &AsyncDataSource::sigWriteFdClosed, *this, &ProvideWorker::writeFdClosed );
87
88 _stream = RpcMessageStream::create( _controlIO );
89
90 return executeHandshake () | mbind( [&]() {
91 AutoDisconnect disC[] = {
92 connect( *_stream, &RpcMessageStream::sigMessageReceived, *this, &ProvideWorker::messageReceived ),
93 connect( *_stream, &RpcMessageStream::sigInvalidMessageReceived, *this, &ProvideWorker::onInvalidMessageReceived )
94 };
95 _loop->run();
96 if ( _fatalError )
97 return expected<void>::error( _fatalError );
98 return expected<void>::success();
99 });
100 }
101
102 std::deque<ProvideWorkerItemRef> &ProvideWorker::requestQueue()
103 {
104 return _pendingProvides;
105 }
106
109 }
110
113 }
114
116 {
117 // by default we log to strErr, if user code wants to change that it can overload this function
119 }
120
121 ProvideWorkerItemRef ProvideWorker::makeItem( ProvideMessage &&spec )
122 {
123 return std::make_shared<ProvideWorkerItem>( std::move(spec) );
124 }
125
126 void ProvideWorker::provideStart(const uint32_t id, const zypp::Url &url, const zypp::filesystem::Pathname &localFile, const zypp::Pathname &stagingFile )
127 {
128 if ( !_stream->sendMessage( ProvideMessage::createProvideStarted ( id
129 , url
130 , localFile.empty () ? std::optional<std::string>() : localFile.asString ()
131 , stagingFile.empty () ? std::optional<std::string>() : stagingFile.asString ()
132 ).impl() ) ) {
133 ERR << "Failed to send ProvideStart message" << std::endl;
134 }
135 }
136
137 void ProvideWorker::provideSuccess(const uint32_t id, bool cacheHit, const zypp::filesystem::Pathname &localFile, const HeaderValueMap extra )
138 {
139 MIL_PRV << "Sending provideSuccess for id " << id << " file " << localFile << std::endl;
140 auto msg = ProvideMessage::createProvideFinished( id ,localFile.asString() ,cacheHit);
141 for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
142 for ( const auto &val : i->second )
143 msg.addValue( i->first, val );
144 }
145 if ( !_stream->sendMessage( msg.impl() ) ) {
146 ERR << "Failed to send ProvideSuccess message" << std::endl;
147 }
148 }
149
150 void ProvideWorker::provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra )
151 {
152 MIL_PRV << "Sending provideFailed for request " << id << " err: " << reason << std::endl;
153 auto msg = ProvideMessage::createErrorResponse ( id, code, reason, transient );
154 for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
155 for ( const auto &val : i->second )
156 msg.addValue( i->first, val );
157 }
158 if ( !_stream->sendMessage( msg.impl() ) ) {
159 ERR << "Failed to send ProvideFailed message" << std::endl;
160 }
161 }
162
163
164 void ProvideWorker::provideFailed ( const uint32_t id, const uint code, const bool transient, const zypp::Exception &e )
165 {
167 if ( !e.historyEmpty() ) {
169 }
170 provideFailed( id
171 , code
172 , e.asUserString()
173 , transient
174 , extra );
175 }
176
177
178 void ProvideWorker::attachSuccess(const uint32_t id)
179 {
180 MIL_PRV << "Sending attachSuccess for request " << id << std::endl;
181 if ( !_stream->sendMessage( ProvideMessage::createAttachFinished ( id ).impl() ) ) {
182 ERR << "Failed to send AttachFinished message" << std::endl;
183 } else {
184 MIL << "Sent back attach success" << std::endl;
185 }
186 }
187
188 void ProvideWorker::detachSuccess(const uint32_t id)
189 {
190 MIL_PRV << "Sending detachSuccess for request " << id << std::endl;
191 if ( !_stream->sendMessage( ProvideMessage::createDetachFinished ( id ).impl() ) ) {
192 ERR << "Failed to send DetachFinished message" << std::endl;
193 }
194 }
195
196 expected<ProvideMessage> ProvideWorker::sendAndWaitForResponse( const ProvideMessage &request , const std::vector<uint> &responseCodes )
197 {
198 // make sure immediateShutdown is not called while we are blocking here
201
202 if ( !_stream->sendMessage( request.impl() ) )
203 return expected<ProvideMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send message")) );
204
205 // flush the io device, this will block until all bytes are written
206 _controlIO->flush();
207
208 while ( !_fatalError ) {
209
210 const auto &msg = _stream->nextMessageWait() | [&]( auto &&nextMessage ) {
211 if ( !nextMessage ) {
212 if ( _fatalError )
213 return expected<RpcMessage>::error( _fatalError );
214 else
215 return expected<RpcMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to wait for response")) );
216 }
217 return expected<RpcMessage>::success( std::move(*nextMessage) );
218 } | mbind ( [&]( auto && m) {
219 return parseReceivedMessage(m);
220 } );
221
222 if ( !msg ) {
223 ERR << "Failed to receive message" << std::endl;
224 return msg;
225 }
226
227 if ( std::find( responseCodes.begin (), responseCodes.end(), msg->code() ) != responseCodes.end() ) {
228 return msg;
229 }
230
231 // remember other messages for later
232 MIL << "Remembering message for later: " << msg->code () << std::endl;
233 _pendingMessages.push_back(*msg);
234 _msgAvail->start(0);
235 }
236 return expected<ProvideMessage>::error( _fatalError );
237 }
238
239 ProvideWorker::MediaChangeRes ProvideWorker::requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc )
240 {
241 return sendAndWaitForResponse( ProvideMessage::createMediaChangeRequest ( id, label, mediaNr, devices, desc ), { ProvideMessage::Code::MediaChanged, ProvideMessage::Code::MediaChangeAbort, ProvideMessage::Code::MediaChangeSkip } )
242 | [&]( expected<ProvideMessage> &&m ) {
243 if ( !m ) {
244 MIL << "Failed to wait for message, aborting the request " << std::endl;
246 }
247 MIL << "Wait finished, with messages still pending: " << this->_pendingMessages.size() << " and provs still pending: " << this->_pendingProvides.size() << std::endl;
248 if ( m->code() == ProvideMessage::Code::MediaChanged )
250 else if ( m->code() == ProvideMessage::Code::MediaChangeSkip )
252 else
254 };
255 }
256
257 expected<AuthInfo> ProvideWorker::requireAuthorization( const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername, const int64_t lastTimestamp, const std::map<std::string, std::string> &extraFields )
258 {
259 return sendAndWaitForResponse( ProvideMessage::createAuthDataRequest( id, url, lastTriedUsername, lastTimestamp, extraFields ), { ProvideMessage::Code::AuthInfo, ProvideMessage::Code::NoAuthData } )
260 | mbind( [&]( ProvideMessage &&m ) {
261 if ( m.code() == ProvideMessage::Code::AuthInfo ) {
262
263 AuthInfo inf;
264 m.forEachVal( [&]( const std::string &name, const ProvideMessage::FieldVal &val ) {
265 if ( name == AuthInfoMsgFields::Username ) {
266 inf.username = val.asString();
267 } else if ( name == AuthInfoMsgFields::Password ) {
268 inf.password = val.asString();
269 } else if ( name == AuthInfoMsgFields::AuthTimestamp ) {
270 inf.last_auth_timestamp = val.asInt64();
271 } else {
272 if ( !val.isString() ) {
273 ERR << "Ignoring invalid extra value, " << name << " is not of type string" << std::endl;
274 }
275 inf.extraKeys[name] = val.asString();
276 }
277 return true;
278 });
279 return expected<AuthInfo>::success(inf);
280
281 }
282 return expected<AuthInfo>::error( ZYPP_EXCPT_PTR( zypp::media::MediaException("No Auth data")) );
283 });
284 }
285
286 AsyncDataSource &ProvideWorker::controlIO()
287 {
288 return *_controlIO.get();
289 }
290
291 expected<void> ProvideWorker::executeHandshake()
292 {
293 const auto &helo = _stream->nextMessageWait();
294 if ( !helo ) {
295 ERR << "Could not receive a handshake message, aborting" << std::endl;
296 return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to receive handshake message")) );;
297 }
298
299 auto exp = _stream->parseMessage<zypp::proto::Configuration>( *helo );
300 if ( !exp ) {
301 invalidMessageReceived( exp.error() );
302 return expected<void>::error(exp.error());
303 }
304
305 return std::move(*exp) | [&]( auto &&conf ) {
306
307 _workerConf = std::move(conf);
308
309 auto &mediaConf = zypp::MediaConfig::instance();
310 for( const auto &[key,value] : _workerConf.values() ) {
311 zypp::Url keyUrl( key );
312 if ( keyUrl.getScheme() == "zconfig" && keyUrl.getAuthority() == "main" ) {
313 mediaConf.setConfigValue( keyUrl.getAuthority(), zypp::Pathname(keyUrl.getPathName()).basename(), value );
314 }
315 }
316
317 return initialize( _workerConf ) | mbind([&]( WorkerCaps &&caps ){
318
319 caps.set_worker_name( _workerName.data() );
320
321 caps.set_cfg_flags ( WorkerCaps::Flags(caps.cfg_flags() | WorkerCaps::ZyppLogFormat) );
322 if ( !_stream->sendMessage ( caps ) ) {
323 return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send capabilities")) );
324 }
325 return expected<void>::success ();
326 });
327 };
328 }
329
330 void ProvideWorker::messageLoop( Timer & )
331 {
332 if ( _fatalError )
333 return;
334
335 while ( _pendingMessages.size () ) {
336 auto m = _pendingMessages.front ();
337 _pendingMessages.pop_front ();
338 handleSingleMessage(m);
339 }
340
341 if ( !_fatalError && _pendingProvides.size() ) {
342 provide();
343 }
344
345 // keep poking until there are no provides anymore
346 if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
347 _msgAvail->start(0);
348 }
349
350 }
351
352 void ProvideWorker::maybeDelayedShutdown()
353 {
354 if ( _inControllerRequest ) {
355 _delayedShutdown->start(0);
356 return;
357 }
358
359 immediateShutdown();
360 _loop->quit ();
361 }
362
363 void ProvideWorker::readFdClosed( uint, AsyncDataSource::ChannelCloseReason )
364 {
365 MIL << "Read FD closed, exiting." << std::endl;
366 maybeDelayedShutdown();
367 }
368
369 void ProvideWorker::writeFdClosed( AsyncDataSource::ChannelCloseReason )
370 {
371 MIL << "Write FD closed, exiting." << std::endl;
372 maybeDelayedShutdown();
373 }
374
375 void ProvideWorker::messageReceived()
376 {
377 while ( auto message = _stream->nextMessage() ) {
378 if ( _fatalError )
379 break;
380 pushSingleMessage(*message);
381 }
382 }
383
384 void ProvideWorker::onInvalidMessageReceived()
385 {
386 invalidMessageReceived( std::exception_ptr() );
387 }
388
389 void ProvideWorker::invalidMessageReceived( std::exception_ptr p )
390 {
391 ERR << "Received a invalid message on the input stream, aborting" << std::endl;
392 if ( p )
393 _fatalError = p;
394 else
395 _fatalError = ZYPP_EXCPT_PTR( InvalidMessageReceivedException() );
396 immediateShutdown();
397 _loop->quit();
398 }
399
400 void ProvideWorker::handleSingleMessage( const ProvideMessage &provide )
401 {
402 const auto code = provide.code();
403 // we only accept requests here
404 if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
405
406 MIL_PRV << "Received request: " << code << std::endl;
407
408 if ( code == ProvideMessage::Code::Cancel ) {
409 const auto &i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [ id = provide.requestId() ]( const auto &it ){ return it->_spec.requestId() == id; } );
410 if ( i != _pendingProvides.end() ) {
411 switch ( (*i)->_state ) {
412 case ProvideWorkerItem::Pending:
413 _stream->sendMessage ( ProvideMessage::createErrorResponse ( provide.requestId (), ProvideMessage::Code::Cancelled, "Cancelled by user." ).impl() );
414 _pendingProvides.erase(i);
415 break;
416 case ProvideWorkerItem::Running:
417 cancel(i);
418 break;
419 case ProvideWorkerItem::Finished:
420 break;
421 }
422 MIL << "Received Cancel for unknown request: " << provide.requestId() << ", ignoring!" << std::endl;
423 }
424 return;
425 }
426
427 _pendingProvides.push_back( makeItem (ProvideMessage(provide)) );
428 return;
429 }
430 ERR << "Unsupported request with code: " << code << " received!" << std::endl;
431 }
432
433 void ProvideWorker::pushSingleMessage( const RpcMessage &message )
434 {
435 const auto &handle = [&]( const RpcMessage &message ){
436 const auto &msgTypeName = message.messagetypename();
437 if ( msgTypeName == rpc::messageTypeName<zypp::proto::ProvideMessage>() ) {
438 return parseReceivedMessage( message )
439 | mbind( [&]( ProvideMessage &&provide ){
440 _pendingMessages.push_back(provide);
441 _msgAvail->start(0);
442 return expected<void>::success();
443 });
444 }
445 return expected<void>::error( ZYPP_EXCPT_PTR( std::invalid_argument(zypp::str::Str()<<"Unknown message received: " << message.messagetypename())) );
446 };
447
448 const auto &exp = handle( message );
449 if ( !exp ) {
450 try {
451 std::rethrow_exception ( exp.error () );
452 } catch ( const zypp::Exception &e ) {
453 ERR << "Catched exception during message handling: " << e << std::endl;
454 } catch ( const std::exception &e ) {
455 ERR << "Catched exception during message handling: " << e.what()<< std::endl;
456 } catch ( ... ) {
457 ERR << "Unknown Exception during message handling" << std::endl;
458 }
459 }
460 }
461
462 expected<ProvideMessage> ProvideWorker::parseReceivedMessage(const RpcMessage &m)
463 {
464 auto exp = ProvideMessage::create(m);
465 if ( !exp )
466 invalidMessageReceived( exp.error() );
467 return exp;
468 }
469}
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:50
Base class for Exception.
Definition: Exception.h:146
std::string asUserString() const
Translated error message as string suitable for the user.
Definition: Exception.cc:82
std::string historyAsString() const
The history as string.
Definition: Exception.cc:146
virtual const char * what() const
Return message string.
Definition: Exception.h:313
bool historyEmpty() const
Whether the history list is empty.
Definition: Exception.h:262
static MediaConfig & instance()
Definition: mediaconfig.cc:46
Url manipulation class.
Definition: Url.h:92
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:533
std::string getAuthority() const
Returns the encoded authority component of the URL.
Definition: Url.cc:541
std::string getPathName(EEncoding eflag=zypp::url::E_DECODED) const
Returns the path name from the URL.
Definition: Url.cc:604
void logToStdErr()
Log to std::err.
Definition: LogControl.cc:894
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
void enableLogForwardingMode(bool enable=true)
Definition: LogControl.cc:881
const std::string & asString() const
String representation.
Definition: Pathname.h:91
std::string basename() const
Return the last component of this path.
Definition: Pathname.h:128
bool empty() const
Test for an empty path.
Definition: Pathname.h:114
Just inherits Exception to separate media exceptions.
ValueMap::iterator beginList()
ValueMap::iterator endList()
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static ProvideMessage createAttachFinished(const uint32_t reqId)
zypp::proto::ProvideMessage & impl()
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createDetachFinished(const uint32_t reqId)
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
void detachSuccess(const uint32_t id)
std::deque< ProvideWorkerItemRef > _pendingProvides
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
ProvideWorker(std::string_view workerName)
expected< void > executeHandshake()
AsyncDataSource::Ptr _controlIO
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
std::deque< ProvideMessage > _pendingMessages
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
void attachSuccess(const uint32_t id)
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
RpcMessageStream::Ptr messageStream() const
RpcMessageStream::Ptr _stream
ProvideNotificatioMode provNotificationMode() const
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
void provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
std::deque< ProvideWorkerItemRef > & requestQueue()
ProvideNotificatioMode _provNotificationMode
expected< ProvideMessage > parseReceivedMessage(const RpcMessage &m)
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
std::exception_ptr _fatalError
Easy-to use interface to the ZYPP dependency resolver.
Definition: CodePitfalls.doc:2
constexpr std::string_view History("history")
zypp::proto::Capabilities WorkerCaps
Definition: provideworker.h:31
#define MIL_PRV
Definition: providedbg_p.h:35
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
Definition: String.h:212
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:432
#define MIL
Definition: Logger.h:96
#define ERR
Definition: Logger.h:98