libzypp 17.31.23
networkrequestdispatcher.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9#include <zypp/APIConfig.h>
14#include <zypp-core/zyppng/base/Timer>
15#include <zypp-core/zyppng/base/SocketNotifier>
16#include <zypp-core/zyppng/base/EventDispatcher>
18#include <assert.h>
19
20#include <zypp/base/Logger.h>
21#include <zypp/base/String.h>
22#include <zypp-core/base/DtorReset>
23
24using namespace boost;
25
26L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_CURL_DEBUG)
27
28
29namespace zyppng {
30
31static const std::string & defaultAgentString()
32{
33 // we need to add the release and identifier to the
34 // agent string.
35 // The target could be not initialized, and then this information
36 // is guessed.
37 static const std::string _value(
39 "ZYpp " LIBZYPP_VERSION_STRING " (curl %s)"
40 , curl_version_info(CURLVERSION_NOW)->version
41 )
42 );
43 return _value;
44}
45
46
48 : BasePrivate( p )
49 , _timer( Timer::create() )
50 , _multi ( curl_multi_init() )
51 , _userAgent( defaultAgentString() )
52{
54
55 curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
56 curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
57 curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
58 curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
59
60 // disabled explicit pipelining since it breaks our tests on releases < 15.2
61 // we could consider enabling it starting with a specific CURL version
62 // curl_multi_setopt( _multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX|CURLPIPE_HTTP1 );
63
64 _timer->setSingleShot( true );
65 _timer->connect( &Timer::sigExpired, *this, &NetworkRequestDispatcherPrivate::multiTimerTimout );
66}
67
69{
71 curl_multi_cleanup( _multi );
72}
73
74//called by curl to setup a timer
75int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
76{
77 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
78 assert( that != nullptr );
79
80 if ( timeout_ms >= 0 ) {
81 that->_timer->start( static_cast<uint64_t>(timeout_ms) );
82 } else {
83 //cancel the timer
84 that->_timer->stop();
85 }
86 return 0;
87}
88
90{
91 handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
92}
93
94int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
95{
96 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
97 assert( that != nullptr );
98 return that->socketCallback( easy, s, what, socketp );
99}
100
101int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
102{
103 std::shared_ptr<SocketNotifier> socketp;
104
105 if ( _socketHandler.count( s ) == 0 ) {
106 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
107 return 0;
108
109 socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
110 _socketHandler.insert( std::make_pair( s, socketp ) );
111
112 socketp->connect( &SocketNotifier::sigActivated, *this, &NetworkRequestDispatcherPrivate::onSocketActivated );
113 } else {
114 socketp = _socketHandler[s];
115 }
116
117 //should never happen
118 if ( !socketp ) {
119 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
120 return 0;
121
122 if ( _socketHandler.count( s ) > 0 )
123 _socketHandler.erase( s );
124
125 void *privatePtr = nullptr;
126 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
127 privatePtr = nullptr; //make sure this was not filled with bad info
128 }
129
130 if ( privatePtr ) {
131 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
132 //we stop the download, if we can not listen for socket changes we can not correctly do anything
133 setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
134 return 0;
135 } else {
136 //a broken handle without anything assigned, also should never happen but make sure and clean it up
137 WAR << "Cleaning up unassigned easy handle" << std::endl;
138 curl_multi_remove_handle( _multi, easy );
139 curl_easy_cleanup( easy );
140 return 0;
141 }
142 }
143
144 //remove the socket
145 if ( what == CURL_POLL_REMOVE ) {
146 socketp->setEnabled( false );
147 _socketHandler.erase( s );
148 return 0;
149 }
150
151 if ( what == CURL_POLL_IN ) {
152 socketp->setMode( SocketNotifier::Read );
153 } else if ( what == CURL_POLL_OUT ) {
154 socketp->setMode( SocketNotifier::Write );
155 } else if ( what == CURL_POLL_INOUT ) {
156 socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
157 }
158
159 socketp->setEnabled();
160 return 0;
161}
162
163void NetworkRequestDispatcherPrivate::onSocketActivated( const SocketNotifier &listener, int events )
164{
165 int evBitmask = 0;
166 if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
167 evBitmask |= CURL_CSELECT_IN;
168 if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
169 evBitmask |= CURL_CSELECT_OUT;
170 if ( (events & SocketNotifier::Error) == SocketNotifier::Error )
171 evBitmask |= CURL_CSELECT_ERR;
172
173 handleMultiSocketAction( listener.socket(), evBitmask );
174}
175
176void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
177{
178 int running = 0;
179
180 // when inside a curl callback we can not call another multi curl API,
181 // for now just lock the thing, but we should consider rewriting this
182 // to post events instead of doing direct calls simply to decouple from
183 // that limitation
184 CURLMcode rc = CURLM_OK;
185 {
186 zypp::DtorReset lockSet( _locked );
187 _locked = true;
188 rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
189 }
190 if (rc != 0) {
191 //we can not recover from a error like that, cancel all and stop
193 cancelAll( err );
194 //emit error
195 _lastError = err;
196 _sigError.emit( *z_func() );
197 return;
198 }
199
200 // make sure we dequeue pending requests ( in case a call to dequeue was blocked during the API call )
201 zypp::OnScopeExit scopeFinally([this](){
202 this->dequeuePending();
203 });
204
205 int msgs_left = 0;
206 CURLMsg *msg = nullptr;
207 while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
208 if(msg->msg == CURLMSG_DONE) {
209 CURL *easy = msg->easy_handle;
210 CURLcode res = msg->data.result;
211
212 void *privatePtr = nullptr;
213 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
214 WAR << "Unable to get CURLINFO_PRIVATE" << std::endl;
215 continue;
216 }
217
218 if ( !privatePtr ) {
219 //broken easy handle not associated, should never happen but clean it up
220 WAR << "Cleaning up unassigned easy handle" << std::endl;
221 curl_multi_remove_handle( _multi, easy );
222 curl_easy_cleanup( easy );
223 continue;
224 }
225
226 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
227 request->dequeueNotify();
228
229 if ( request->hasMoreWork() && ( res == CURLE_OK || request->canRecover() ) ) {
230 std::string errBuf = "Broken easy handle in request";
231 if ( !request->_easyHandle ) {
233 setFinished( *request->z_func(), e );
234 continue;
235 }
236
237 // remove the handle from multi to change options
238 curl_multi_remove_handle( _multi, request->_easyHandle );
239
240 errBuf = "Failed to reinitialize the request";
241 if ( !request->prepareToContinue ( errBuf ) ) {
243 setFinished( *request->z_func(), e );
244 } else {
245 // add the request back to the multi handle, it is not done
246 if ( !addRequestToMultiHandle( *request->z_func() ) )
247 continue;
248
249 request->aboutToStart( );
250 }
251 } else {
252 //trigger notification about file downloaded
253 NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->errorMessage() );
254 setFinished( *request->z_func(), e );
255 }
256 //attention request could be deleted from here on
257 }
258 }
259}
260
262{
263 //prevent dequeuePending from filling up the runningDownloads again
264 zypp::DtorReset lockReset( _locked );
265 _locked = true;
266
267 while ( _runningDownloads.size() ) {
268 std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
269 setFinished(*req, result );
270 }
271 while ( _pendingDownloads.size() ) {
272 std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
273 setFinished(*req, result );
274 }
275}
276
278{
279 auto delReq = []( auto &list, NetworkRequest &req ) -> std::shared_ptr<NetworkRequest> {
280 auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
281 return req.d_func() == r->d_func();
282 } );
283 if ( it != list.end() ) {
284 auto ptr = *it;
285 list.erase( it );
286 return ptr;
287 }
288 return nullptr;
289 };
290
291 // We have a tricky situation if a network request is called when inside a callback. In those cases, it is
292 // not allowed to call curl_multi_remove_handle. We need to tell the callback to fail, so the download
293 // is cancelled by curl itself. We also need to store the current result for later
294 auto rmode = std::get_if<NetworkRequestPrivate::running_t>( &req.d_func()->_runningMode );
295 if ( rmode ) {
296 if ( rmode->_isInCallback ) {
297 // the first cached result wins)
298 if ( !rmode->_cachedResult )
299 rmode->_cachedResult = result;
300 return;
301 } else if ( rmode->_cachedResult ) {
302 result = rmode->_cachedResult.value();
303 }
304 }
305
306 auto rLocked = delReq( _runningDownloads, req );
307 if ( !rLocked )
308 rLocked = delReq( _pendingDownloads, req );
309
310 void *easyHandle = req.d_func()->_easyHandle;
311 if ( easyHandle )
312 curl_multi_remove_handle( _multi, easyHandle );
313
314 req.d_func()->_dispatcher = nullptr;
315
316 //first set the result, the Request might have a checksum to check as well so a currently
317 //successful request could fail later on
318 req.d_func()->setResult( std::move(result) );
319 _sigDownloadFinished.emit( *z_func(), req );
320
321 //we got a open slot, try to dequeue or send the finished signals if all queues are empty
323}
324
326{
327 CURLMcode rc = curl_multi_add_handle( _multi, req.d_func()->_easyHandle );
328 if ( rc != 0 ) {
330 return false;
331 }
332 return true;
333}
334
336{
337 if ( !_isRunning || _locked )
338 return;
339
340 while ( _maxConnections == -1 || ( (std::size_t)_maxConnections > _runningDownloads.size() ) ) {
341 if ( !_pendingDownloads.size() )
342 break;
343
344 std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
345 _pendingDownloads.pop_front();
346
347 std::string errBuf = "Failed to initialize easy handle";
348 if ( !req->d_func()->initialize( errBuf ) ) {
349 //@TODO store the CURL error in the errors extra info
351 continue;
352 }
353
354 if ( !addRequestToMultiHandle( *req ) )
355 continue;
356
357 req->d_func()->aboutToStart();
358 _sigDownloadStarted.emit( *z_func(), *req );
359
360 _runningDownloads.push_back( std::move(req) );
361 }
362
363 //check for empty queues
364 if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
365 //once we finished all requests, cancel the timer too, so curl is not called without requests
366 _timer->stop();
367 _sigQueueFinished.emit( *z_func() );
368 }
369}
370
371ZYPP_IMPL_PRIVATE(NetworkRequestDispatcher)
372
373NetworkRequestDispatcher::NetworkRequestDispatcher( )
374 : Base( * new NetworkRequestDispatcherPrivate ( *this ) )
375{
376
377}
378
379bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
380{
381 curl_version_info_data *curl_info = nullptr;
382 curl_info = curl_version_info(CURLVERSION_NOW);
383 // curl_info does not need any free (is static)
384 if (curl_info->protocols)
385 {
386 const char * const *proto;
387 std::string scheme( url.getScheme() );
388 bool found = false;
389 for(proto=curl_info->protocols; !found && *proto; ++proto) {
390 if( scheme == std::string((const char *)*proto))
391 found = true;
392 }
393 return found;
394 }
395 return true;
396}
397
398void NetworkRequestDispatcher::setMaximumConcurrentConnections( const int maxConn )
399{
400 d_func()->_maxConnections = maxConn;
401}
402
403int NetworkRequestDispatcher::maximumConcurrentConnections () const
404{
405 return d_func()->_maxConnections;
406}
407
408void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
409{
410 if ( !req )
411 return;
412 Z_D();
413
414 if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
415 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
416 return;
417 }
418
419 if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
420 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
421 return;
422 }
423
424 req->d_func()->_dispatcher = this;
425 if ( req->priority() == NetworkRequest::Normal )
426 d->_pendingDownloads.push_back( req );
427 else {
428 auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [ prio = req->priority() ]( const auto &pendingReq ){
429 return pendingReq->priority() < prio;
430 });
431
432 //if we have a valid iterator, decrement we found a pending download request with lower prio, insert before that
433 if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
434 it--;
435 d->_pendingDownloads.insert( it, req );
436 }
437
438 //dequeue if running and we have capacity
439 d->dequeuePending();
440}
441
442void NetworkRequestDispatcher::setAgentString( const std::string &agent )
443{
444 Z_D();
445 if ( agent.empty() )
446 d->_userAgent = defaultAgentString();
447 else
448 d->_userAgent = agent;
449}
450
451const std::string &NetworkRequestDispatcher::agentString() const
452{
453 return d_func()->_userAgent;
454}
455
456void NetworkRequestDispatcher::setHostSpecificHeader( const std::string &host, const std::string &headerName, const std::string &value )
457{
458 Z_D();
459 if ( value.empty() ) {
460 if ( auto i = d->_customHeaders.find( host ); i != d->_customHeaders.end() ) {
461 if ( auto v = i->second.find( headerName ); v != i->second.end() ) {
462 i->second.erase (v);
463 }
464 if ( i->second.empty() )
465 d->_customHeaders.erase(i);
466 }
467 return;
468 }
469 d->_customHeaders[host][headerName] = value;
470}
471
472const NetworkRequestDispatcher::SpecificHeaderMap &NetworkRequestDispatcher::hostSpecificHeaders() const
473{
474 return d_func()->_customHeaders;
475}
476
477void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
478{
479 cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
480}
481
482void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
483{
484 Z_D();
485
486 if ( req.d_func()->_dispatcher != this ) {
487 //TODO throw exception
488 return;
489 }
490
491 d->setFinished( req, err );
492}
493
494void NetworkRequestDispatcher::run()
495{
496 Z_D();
497 d->_isRunning = true;
498
499 if ( d->_pendingDownloads.size() )
500 d->dequeuePending();
501}
502
503void NetworkRequestDispatcher::reschedule()
504{
505 Z_D();
506 if ( !d->_pendingDownloads.size() )
507 return;
508
509 std::stable_sort( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &a, const auto &b ){
510 return a->priority() < b->priority();
511 });
512
513 d->dequeuePending();
514}
515
516size_t NetworkRequestDispatcher::count()
517{
518 Z_D();
519 return d->_pendingDownloads.size() + d->_runningDownloads.size();
520}
521
522const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
523{
524 return d_func()->_lastError;
525}
526
527SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
528{
529 return d_func()->_sigDownloadStarted;
530}
531
532SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
533{
534 return d_func()->_sigDownloadFinished;
535}
536
537SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
538{
539 return d_func()->_sigQueueFinished;
540}
541
542SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
543{
544 return d_func()->_sigError;
545}
546
547}
Provides API related macros.
#define LIBZYPP_VERSION_STRING
Definition: APIConfig.h:15
Edition * _value
Definition: SysContent.cc:311
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:50
NetworkRequestDispatcherPrivate(NetworkRequestDispatcher &p)
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
Signal< void(NetworkRequestDispatcher &)> _sigError
Signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
void setFinished(NetworkRequest &req, NetworkRequestError result)
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
void onSocketActivated(const SocketNotifier &listener, int events)
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
void cancelAll(NetworkRequestError result)
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const std::string &nativeError)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
The NetworkRequestError class Represents a error that occured in.
std::string errorMessage() const
Definition: request.cc:752
bool prepareToContinue(std::string &errBuf)
Definition: request.cc:443
unsigned short a
unsigned short b
Boost libraries.
void globalInitCurlOnce()
Definition: curlhelper.cc:64
std::string form(const char *format,...) __attribute__((format(printf
Printf style construction of std::string.
Definition: String.cc:36
static const std::string & defaultAgentString()
ZYPP_IMPL_PRIVATE(Provide)
#define WAR
Definition: Logger.h:97
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition: Logger.h:113