libzypp  10.5.0
MediaMultiCurl.cc
Go to the documentation of this file.
00001 /*---------------------------------------------------------------------\
00002 |                          ____ _   __ __ ___                          |
00003 |                         |__  / \ / / . \ . \                         |
00004 |                           / / \ V /|  _/  _/                         |
00005 |                          / /__ | | | | | |                           |
00006 |                         /_____||_| |_| |_|                           |
00007 |                                                                      |
00008 \---------------------------------------------------------------------*/
00013 #include <ctype.h>
00014 #include <sys/types.h>
00015 #include <signal.h>
00016 #include <sys/wait.h>
00017 #include <netdb.h>
00018 #include <arpa/inet.h>
00019 
00020 #include <vector>
00021 #include <iostream>
00022 #include <algorithm>
00023 
00024 
00025 #include "zypp/ZConfig.h"
00026 #include "zypp/base/Logger.h"
00027 #include "zypp/media/MediaMultiCurl.h"
00028 #include "zypp/media/MetaLinkParser.h"
00029 
00030 using namespace std;
00031 using namespace zypp::base;
00032 
00033 #undef CURLVERSION_AT_LEAST
00034 #define CURLVERSION_AT_LEAST(M,N,O) LIBCURL_VERSION_NUM >= ((((M)<<8)+(N))<<8)+(O)
00035 
00036 namespace zypp {
00037   namespace media {
00038 
00039 
00041 
00042 
00043 class multifetchrequest;
00044 
00045 // Hack: we derive from MediaCurl just to get the storage space for
00046 // settings, url, curlerrors and the like
00047 
00048 class multifetchworker : MediaCurl {
00049   friend class multifetchrequest;
00050 
00051 public:
00052   multifetchworker(int no, multifetchrequest &request, const Url &url);
00053   ~multifetchworker();
00054   void nextjob();
00055   void run();
00056   bool checkChecksum();
00057   bool recheckChecksum();
00058   void disableCompetition();
00059 
00060   void checkdns();
00061   void adddnsfd(fd_set &rset, int &maxfd);
00062   void dnsevent(fd_set &rset);
00063 
00064   int _workerno;
00065 
00066   int _state;
00067   bool _competing;
00068 
00069   size_t _blkno;
00070   off_t _blkstart;
00071   size_t _blksize;
00072   bool _noendrange;
00073 
00074   double _blkstarttime;
00075   size_t _blkreceived;
00076   off_t  _received;
00077 
00078   double _avgspeed;
00079   double _maxspeed;
00080 
00081   double _sleepuntil;
00082 
00083 private:
00084   void stealjob();
00085 
00086   size_t writefunction(void *ptr, size_t size);
00087   static size_t _writefunction(void *ptr, size_t size, size_t nmemb, void *stream);
00088 
00089   size_t headerfunction(char *ptr, size_t size);
00090   static size_t _headerfunction(void *ptr, size_t size, size_t nmemb, void *stream);
00091 
00092   multifetchrequest *_request;
00093   int _pass;
00094   string _urlbuf;
00095   off_t _off;
00096   size_t _size;
00097   Digest _dig;
00098 
00099   pid_t _pid;
00100   int _dnspipe;
00101 };
00102 
00103 #define WORKER_STARTING 0
00104 #define WORKER_LOOKUP   1
00105 #define WORKER_FETCH    2
00106 #define WORKER_DISCARD  3
00107 #define WORKER_DONE     4
00108 #define WORKER_SLEEP    5
00109 #define WORKER_BROKEN   6
00110 
00111 
00112 
00113 class multifetchrequest {
00114 public:
00115   multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize);
00116   ~multifetchrequest();
00117 
00118   void run(std::vector<Url> &urllist);
00119 
00120 protected:
00121   friend class multifetchworker;
00122 
00123   const MediaMultiCurl *_context;
00124   const Pathname _filename;
00125   Url _baseurl;
00126 
00127   FILE *_fp;
00128   callback::SendReport<DownloadProgressReport> *_report;
00129   MediaBlockList *_blklist;
00130   off_t _filesize;
00131 
00132   CURLM *_multi;
00133 
00134   std::list<multifetchworker *> _workers;
00135   bool _stealing;
00136   bool _havenewjob;
00137 
00138   size_t _blkno;
00139   off_t _blkoff;
00140   size_t _activeworkers;
00141   size_t _lookupworkers;
00142   size_t _sleepworkers;
00143   double _minsleepuntil;
00144   bool _finished;
00145   off_t _totalsize;
00146   off_t _fetchedsize;
00147   off_t _fetchedgoodsize;
00148 
00149   double _starttime;
00150   double _lastprogress;
00151 
00152   double _lastperiodstart;
00153   double _lastperiodfetched;
00154   double _periodavg;
00155 
00156 public:
00157   double _timeout;
00158   double _connect_timeout;
00159   double _maxspeed;
00160   int _maxworkers;
00161 };
00162 
00163 #define BLKSIZE         131072
00164 #define MAXURLS         10
00165 
00166 
00168 
00169 static double
00170 currentTime()
00171 {
00172   struct timeval tv;
00173   if (gettimeofday(&tv, NULL))
00174     return 0;
00175   return tv.tv_sec + tv.tv_usec / 1000000.;
00176 }
00177 
00178 size_t
00179 multifetchworker::writefunction(void *ptr, size_t size)
00180 {
00181   size_t len, cnt;
00182   if (_state == WORKER_BROKEN)
00183     return size ? 0 : 1;
00184 
00185   double now = currentTime();
00186 
00187   len = size > _size ? _size : size;
00188   if (!len)
00189     {
00190       // kill this job?
00191       return size;
00192     }
00193 
00194   if (_blkstart && _off == _blkstart)
00195     {
00196       // make sure that the server replied with "partial content"
00197       // for http requests
00198       char *effurl;
00199       (void)curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &effurl);
00200       if (effurl && !strncasecmp(effurl, "http", 4))
00201         {
00202           long statuscode = 0;
00203           (void)curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &statuscode);
00204           if (statuscode != 206)
00205             return size ? 0 : 1;
00206         }
00207     }
00208 
00209   _blkreceived += len;
00210   _received += len;
00211 
00212   _request->_lastprogress = now;
00213 
00214   if (_state == WORKER_DISCARD || !_request->_fp)
00215     {
00216       // block is no longer needed
00217       // still calculate the checksum so that we can throw out bad servers
00218       if (_request->_blklist)
00219         _dig.update((const char *)ptr, len);
00220       _off += len;
00221       _size -= len;
00222       return size;
00223     }
00224   if (fseeko(_request->_fp, _off, SEEK_SET))
00225     return size ? 0 : 1;
00226   cnt = fwrite(ptr, 1, len, _request->_fp);
00227   if (cnt > 0)
00228     {
00229       _request->_fetchedsize += cnt;
00230       if (_request->_blklist)
00231         _dig.update((const char *)ptr, cnt);
00232       _off += cnt;
00233       _size -= cnt;
00234       if (cnt == len)
00235         return size;
00236     }
00237   return cnt;
00238 }
00239 
00240 size_t
00241 multifetchworker::_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
00242 {
00243   multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
00244   return me->writefunction(ptr, size * nmemb);
00245 }
00246 
00247 size_t
00248 multifetchworker::headerfunction(char *p, size_t size)
00249 {
00250   size_t l = size;
00251   if (l > 9 && !strncasecmp(p, "Location:", 9))
00252     {
00253       string line(p + 9, l - 9);
00254       if (line[l - 10] == '\r')
00255         line.erase(l - 10, 1);
00256       DBG << "#" << _workerno << ": redirecting to" << line << endl;
00257       return size;
00258     }
00259   if (l <= 14 || l >= 128 || strncasecmp(p, "Content-Range:", 14) != 0)
00260     return size;
00261   p += 14;
00262   l -= 14;
00263   while (l && (*p == ' ' || *p == '\t'))
00264     p++, l--;
00265   if (l < 6 || strncasecmp(p, "bytes", 5))
00266     return size;
00267   p += 5;
00268   l -= 5;
00269   char buf[128];
00270   memcpy(buf, p, l);
00271   buf[l] = 0;
00272   unsigned long long start, off, filesize;
00273   if (sscanf(buf, "%llu-%llu/%llu", &start, &off, &filesize) != 3)
00274     return size;
00275   if (_request->_filesize == (off_t)-1)
00276     {
00277       WAR << "#" << _workerno << ": setting request filesize to " << filesize << endl;
00278       _request->_filesize = filesize;
00279       if (_request->_totalsize == 0 && !_request->_blklist)
00280         _request->_totalsize = filesize;
00281     }
00282   if (_request->_filesize != (off_t)filesize)
00283     {
00284       DBG << "#" << _workerno << ": filesize mismatch" << endl;
00285       _state = WORKER_BROKEN;
00286       strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE);
00287     }
00288   return size;
00289 }
00290 
00291 size_t
00292 multifetchworker::_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
00293 {
00294   multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
00295   return me->headerfunction((char *)ptr, size * nmemb);
00296 }
00297 
00298 multifetchworker::multifetchworker(int no, multifetchrequest &request, const Url &url)
00299 : MediaCurl(url, Pathname())
00300 {
00301   _workerno = no;
00302   _request = &request;
00303   _state = WORKER_STARTING;
00304   _competing = false;
00305   _off = _blkstart = 0;
00306   _size = _blksize = 0;
00307   _pass = 0;
00308   _blkno = 0;
00309   _pid = 0;
00310   _dnspipe = -1;
00311   _blkreceived = 0;
00312   _received = 0;
00313   _blkstarttime = 0;
00314   _avgspeed = 0;
00315   _sleepuntil = 0;
00316   _maxspeed = _request->_maxspeed;
00317   _noendrange = false;
00318 
00319   Url curlUrl( clearQueryString(url) );
00320   _urlbuf = curlUrl.asString();
00321   _curl = _request->_context->fromEasyPool(_url.getHost());
00322   if (_curl)
00323     DBG << "reused worker from pool" << endl;
00324   if (!_curl && !(_curl = curl_easy_init()))
00325     {
00326       _state = WORKER_BROKEN;
00327       strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE);
00328       return;
00329     }
00330   try
00331     {
00332       setupEasy();
00333     }
00334   catch (Exception &ex)
00335     {
00336       curl_easy_cleanup(_curl);
00337       _curl = 0;
00338       _state = WORKER_BROKEN;
00339       strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE);
00340       return;
00341     }
00342   curl_easy_setopt(_curl, CURLOPT_PRIVATE, this);
00343   curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str());
00344   curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, &_writefunction);
00345   curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this);
00346   if (_request->_filesize == off_t(-1) || !_request->_blklist || !_request->_blklist->haveChecksum(0))
00347     {
00348       curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, &_headerfunction);
00349       curl_easy_setopt(_curl, CURLOPT_HEADERDATA, this);
00350     }
00351   // if this is the same host copy authorization
00352   // (the host check is also what curl does when doing a redirect)
00353   // (note also that unauthorized exceptions are thrown with the request host)
00354   if (url.getHost() == _request->_context->_url.getHost())
00355     {
00356       _settings.setUsername(_request->_context->_settings.username());
00357       _settings.setPassword(_request->_context->_settings.password());
00358       _settings.setAuthType(_request->_context->_settings.authType());
00359       if ( _settings.userPassword().size() )
00360         {
00361           curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str());
00362           string use_auth = _settings.authType();
00363           if (use_auth.empty())
00364             use_auth = "digest,basic";        // our default
00365           long auth = CurlAuthData::auth_type_str2long(use_auth);
00366           if( auth != CURLAUTH_NONE)
00367           {
00368             DBG << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth
00369                 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl;
00370             curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth);
00371           }
00372         }
00373     }
00374   checkdns();
00375 }
00376 
00377 multifetchworker::~multifetchworker()
00378 {
00379   if (_curl)
00380     {
00381       if (_state == WORKER_FETCH || _state == WORKER_DISCARD)
00382         curl_multi_remove_handle(_request->_multi, _curl);
00383       if (_state == WORKER_DONE || _state == WORKER_SLEEP)
00384         {
00385 #if CURLVERSION_AT_LEAST(7,15,5)
00386           curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0);
00387 #endif
00388           curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
00389           curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0);
00390           curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0);
00391           curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0);
00392           curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0);
00393           _request->_context->toEasyPool(_url.getHost(), _curl);
00394         }
00395       else
00396         curl_easy_cleanup(_curl);
00397       _curl = 0;
00398     }
00399   if (_pid)
00400     {
00401       kill(_pid, SIGKILL);
00402       int status;
00403       while (waitpid(_pid, &status, 0) == -1)
00404         if (errno != EINTR)
00405           break;
00406       _pid = 0;
00407     }
00408   if (_dnspipe != -1)
00409     {
00410       close(_dnspipe);
00411       _dnspipe = -1;
00412     }
00413   // the destructor in MediaCurl doesn't call disconnect() if
00414   // the media is not attached, so we do it here manually
00415   disconnectFrom();
00416 }
00417 
00418 static inline bool env_isset(string name)
00419 {
00420   const char *s = getenv(name.c_str());
00421   return s && *s ? true : false;
00422 }
00423 
00424 void
00425 multifetchworker::checkdns()
00426 {
00427   string host = _url.getHost();
00428 
00429   if (host.empty())
00430     return;
00431 
00432   if (_request->_context->isDNSok(host))
00433     return;
00434 
00435   // no need to do dns checking for numeric hosts
00436   char addrbuf[128];
00437   if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1)
00438     return;
00439   if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1)
00440     return;
00441 
00442   // no need to do dns checking if we use a proxy
00443   if (!_settings.proxy().empty())
00444     return;
00445   if (env_isset("all_proxy") || env_isset("ALL_PROXY"))
00446     return;
00447   string schemeproxy = _url.getScheme() + "_proxy";
00448   if (env_isset(schemeproxy))
00449     return;
00450   if (schemeproxy != "http_proxy")
00451     {
00452       std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper);
00453       if (env_isset(schemeproxy))
00454         return;
00455     }
00456 
00457   DBG << "checking DNS lookup of " << host << endl;
00458   int pipefds[2];
00459   if (pipe(pipefds))
00460     {
00461       _state = WORKER_BROKEN;
00462       strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE);
00463       return;
00464     }
00465   _pid = fork();
00466   if (_pid == pid_t(-1))
00467     {
00468       close(pipefds[0]);
00469       close(pipefds[1]);
00470       _pid = 0;
00471       _state = WORKER_BROKEN;
00472       strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE);
00473       return;
00474     }
00475   else if (_pid == 0)
00476     {
00477       close(pipefds[0]);
00478       // XXX: close all other file descriptors
00479       struct addrinfo *ai, aihints;
00480       memset(&aihints, 0, sizeof(aihints));
00481       aihints.ai_family = PF_UNSPEC;
00482       int tstsock = socket(PF_INET6, SOCK_DGRAM | SOCK_CLOEXEC, 0);
00483       if (tstsock == -1)
00484         aihints.ai_family = PF_INET;
00485       else
00486         close(tstsock);
00487       aihints.ai_socktype = SOCK_STREAM;
00488       aihints.ai_flags = AI_CANONNAME;
00489       unsigned int connecttimeout = _request->_connect_timeout;
00490       if (connecttimeout)
00491         alarm(connecttimeout);
00492       signal(SIGALRM, SIG_DFL);
00493       if (getaddrinfo(host.c_str(), NULL, &aihints, &ai))
00494         _exit(1);
00495       _exit(0);
00496     }
00497   close(pipefds[1]);
00498   _dnspipe = pipefds[0];
00499   _state = WORKER_LOOKUP;
00500 }
00501 
00502 void
00503 multifetchworker::adddnsfd(fd_set &rset, int &maxfd)
00504 {
00505   if (_state != WORKER_LOOKUP)
00506     return;
00507   FD_SET(_dnspipe, &rset);
00508   if (maxfd < _dnspipe)
00509     maxfd = _dnspipe;
00510 }
00511 
00512 void
00513 multifetchworker::dnsevent(fd_set &rset)
00514 {
00515 
00516   if (_state != WORKER_LOOKUP || !FD_ISSET(_dnspipe, &rset))
00517     return;
00518   int status;
00519   while (waitpid(_pid, &status, 0) == -1)
00520     {
00521       if (errno != EINTR)
00522         return;
00523     }
00524   _pid = 0;
00525   if (_dnspipe != -1)
00526     {
00527       close(_dnspipe);
00528       _dnspipe = -1;
00529     }
00530   if (!WIFEXITED(status))
00531     {
00532       _state = WORKER_BROKEN;
00533       strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
00534       _request->_activeworkers--;
00535       return;
00536     }
00537   int exitcode = WEXITSTATUS(status);
00538   DBG << "#" << _workerno << ": DNS lookup returned " << exitcode << endl;
00539   if (exitcode != 0)
00540     {
00541       _state = WORKER_BROKEN;
00542       strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
00543       _request->_activeworkers--;
00544       return;
00545     }
00546   _request->_context->setDNSok(_url.getHost());
00547   nextjob();
00548 }
00549 
00550 bool
00551 multifetchworker::checkChecksum()
00552 {
00553   // DBG << "checkChecksum block " << _blkno << endl;
00554   if (!_blksize || !_request->_blklist)
00555     return true;
00556   return _request->_blklist->verifyDigest(_blkno, _dig);
00557 }
00558 
00559 bool
00560 multifetchworker::recheckChecksum()
00561 {
00562   // DBG << "recheckChecksum block " << _blkno << endl;
00563   if (!_request->_fp || !_blksize || !_request->_blklist)
00564     return true;
00565   if (fseeko(_request->_fp, _blkstart, SEEK_SET))
00566     return false;
00567   char buf[4096];
00568   size_t l = _blksize;
00569   _request->_blklist->createDigest(_dig);       // resets digest
00570   while (l)
00571     {
00572       size_t cnt = l > sizeof(buf) ? sizeof(buf) : l;
00573       if (fread(buf, cnt, 1, _request->_fp) != 1)
00574         return false;
00575       _dig.update(buf, cnt);
00576       l -= cnt;
00577     }
00578   return _request->_blklist->verifyDigest(_blkno, _dig);
00579 }
00580 
00581 
00582 void
00583 multifetchworker::stealjob()
00584 {
00585   if (!_request->_stealing)
00586     {
00587       DBG << "start stealing!" << endl;
00588       _request->_stealing = true;
00589     }
00590   multifetchworker *best = 0;
00591   std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
00592   double now = 0;
00593   for (; workeriter != _request->_workers.end(); ++workeriter)
00594     {
00595       multifetchworker *worker = *workeriter;
00596       if (worker == this)
00597         continue;
00598       if (worker->_pass == -1)
00599         continue;       // do not steal!
00600       if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_blksize)
00601         continue;       // do not steal finished jobs
00602       if (!worker->_avgspeed && worker->_blkreceived)
00603         {
00604           if (!now)
00605             now = currentTime();
00606           if (now > worker->_blkstarttime)
00607             worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
00608         }
00609       if (!best || best->_pass > worker->_pass)
00610         {
00611           best = worker;
00612           continue;
00613         }
00614       if (best->_pass < worker->_pass)
00615         continue;
00616       // if it is the same block, we want to know the best worker, otherwise the worst
00617       if (worker->_blkstart == best->_blkstart)
00618         {
00619           if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed < (best->_blksize - best->_blkreceived) * worker->_avgspeed)
00620             best = worker;
00621         }
00622       else
00623         {
00624           if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed > (best->_blksize - best->_blkreceived) * worker->_avgspeed)
00625             best = worker;
00626         }
00627     }
00628   if (!best)
00629     {
00630       _state = WORKER_DONE;
00631       _request->_activeworkers--;
00632       _request->_finished = true;
00633       return;
00634     }
00635   // do not sleep twice
00636   if (_state != WORKER_SLEEP)
00637     {
00638       if (!_avgspeed && _blkreceived)
00639         {
00640           if (!now)
00641             now = currentTime();
00642           if (now > _blkstarttime)
00643             _avgspeed = _blkreceived / (now - _blkstarttime);
00644         }
00645 
00646       // lets see if we should sleep a bit
00647       DBG << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_blksize << endl;
00648       DBG << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_blksize - best->_blkreceived) << endl;
00649       if (_avgspeed && best->_avgspeed && best->_blksize - best->_blkreceived > 0 &&
00650           (best->_blksize - best->_blkreceived) * _avgspeed < best->_blksize * best->_avgspeed)
00651         {
00652           if (!now)
00653             now = currentTime();
00654           double sl = (best->_blksize - best->_blkreceived) / best->_avgspeed * 2;
00655           if (sl > 1)
00656             sl = 1;
00657           DBG << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl;
00658           _sleepuntil = now + sl;
00659           _state = WORKER_SLEEP;
00660           _request->_sleepworkers++;
00661           return;
00662         }
00663     }
00664 
00665   _competing = true;
00666   best->_competing = true;
00667   _blkstart = best->_blkstart;
00668   _blksize = best->_blksize;
00669   best->_pass++;
00670   _pass = best->_pass;
00671   _blkno = best->_blkno;
00672   run();
00673 }
00674 
00675 void
00676 multifetchworker::disableCompetition()
00677 {
00678   std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
00679   for (; workeriter != _request->_workers.end(); ++workeriter)
00680     {
00681       multifetchworker *worker = *workeriter;
00682       if (worker == this)
00683         continue;
00684       if (worker->_blkstart == _blkstart)
00685         {
00686           if (worker->_state == WORKER_FETCH)
00687             worker->_state = WORKER_DISCARD;
00688           worker->_pass = -1;   /* do not steal this one, we already have it */
00689         }
00690     }
00691 }
00692 
00693 
00694 void
00695 multifetchworker::nextjob()
00696 {
00697   _noendrange = false;
00698   if (_request->_stealing)
00699     {
00700       stealjob();
00701       return;
00702     }
00703 
00704   MediaBlockList *blklist = _request->_blklist;
00705   if (!blklist)
00706     {
00707       _blksize = BLKSIZE;
00708       if (_request->_filesize != off_t(-1))
00709         {
00710           if (_request->_blkoff >= _request->_filesize)
00711             {
00712               stealjob();
00713               return;
00714             }
00715           _blksize = _request->_filesize - _request->_blkoff;
00716           if (_blksize > BLKSIZE)
00717             _blksize = BLKSIZE;
00718         }
00719     }
00720   else
00721     {
00722       MediaBlock blk = blklist->getBlock(_request->_blkno);
00723       while (_request->_blkoff >= (off_t)(blk.off + blk.size))
00724         {
00725           if (++_request->_blkno == blklist->numBlocks())
00726             {
00727               stealjob();
00728               return;
00729             }
00730           blk = blklist->getBlock(_request->_blkno);
00731           _request->_blkoff = blk.off;
00732         }
00733       _blksize = blk.off + blk.size - _request->_blkoff;
00734       if (_blksize > BLKSIZE && !blklist->haveChecksum(_request->_blkno))
00735         _blksize = BLKSIZE;
00736     }
00737   _blkno = _request->_blkno;
00738   _blkstart = _request->_blkoff;
00739   _request->_blkoff += _blksize;
00740   run();
00741 }
00742 
00743 void
00744 multifetchworker::run()
00745 {
00746   char rangebuf[128];
00747 
00748   if (_state == WORKER_BROKEN || _state == WORKER_DONE)
00749      return;    // just in case...
00750   if (_noendrange)
00751     sprintf(rangebuf, "%llu-", (unsigned long long)_blkstart);
00752   else
00753     sprintf(rangebuf, "%llu-%llu", (unsigned long long)_blkstart, (unsigned long long)_blkstart + _blksize - 1);
00754   DBG << "#" << _workerno << ": BLK " << _blkno << ":" << rangebuf << " " << _url << endl;
00755   if (curl_easy_setopt(_curl, CURLOPT_RANGE, !_noendrange || _blkstart != 0 ? rangebuf : (char *)0) != CURLE_OK)
00756     {
00757       _request->_activeworkers--;
00758       _state = WORKER_BROKEN;
00759       strncpy(_curlError, "curl_easy_setopt range failed", CURL_ERROR_SIZE);
00760       return;
00761     }
00762   if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK)
00763     {
00764       _request->_activeworkers--;
00765       _state = WORKER_BROKEN;
00766       strncpy(_curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE);
00767       return;
00768     }
00769   _request->_havenewjob = true;
00770   _off = _blkstart;
00771   _size = _blksize;
00772   if (_request->_blklist)
00773     _request->_blklist->createDigest(_dig);     // resets digest
00774   _state = WORKER_FETCH;
00775 
00776   double now = currentTime();
00777   _blkstarttime = now;
00778   _blkreceived = 0;
00779 }
00780 
00781 
00783 
00784 
00785 multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) : _context(context), _filename(filename), _baseurl(baseurl)
00786 {
00787   _fp = fp;
00788   _report = report;
00789   _blklist = blklist;
00790   _filesize = filesize;
00791   _multi = multi;
00792   _stealing = false;
00793   _havenewjob = false;
00794   _blkno = 0;
00795   if (_blklist)
00796     _blkoff = _blklist->getBlock(0).off;
00797   else
00798     _blkoff = 0;
00799   _activeworkers = 0;
00800   _lookupworkers = 0;
00801   _sleepworkers = 0;
00802   _minsleepuntil = 0;
00803   _finished = false;
00804   _fetchedsize = 0;
00805   _fetchedgoodsize = 0;
00806   _totalsize = 0;
00807   _lastperiodstart = _lastprogress = _starttime = currentTime();
00808   _lastperiodfetched = 0;
00809   _periodavg = 0;
00810   _timeout = 0;
00811   _connect_timeout = 0;
00812   _maxspeed = 0;
00813   _maxworkers = 0;
00814   if (blklist)
00815     {
00816       for (size_t blkno = 0; blkno < blklist->numBlocks(); blkno++)
00817         {
00818           MediaBlock blk = blklist->getBlock(blkno);
00819           _totalsize += blk.size;
00820         }
00821     }
00822   else if (filesize != off_t(-1))
00823     _totalsize = filesize;
00824 }
00825 
00826 multifetchrequest::~multifetchrequest()
00827 {
00828   for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00829     {
00830       multifetchworker *worker = *workeriter;
00831       *workeriter = NULL;
00832       delete worker;
00833     }
00834   _workers.clear();
00835 }
00836 
00837 void
00838 multifetchrequest::run(std::vector<Url> &urllist)
00839 {
00840   int workerno = 0;
00841   std::vector<Url>::iterator urliter = urllist.begin();
00842   for (;;)
00843     {
00844       fd_set rset, wset, xset;
00845       int maxfd, nqueue;
00846 
00847       if (_finished)
00848         {
00849           DBG << "finished!" << endl;
00850           break;
00851         }
00852 
00853       if ((int)_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS)
00854         {
00855           // spawn another worker!
00856           multifetchworker *worker = new multifetchworker(workerno++, *this, *urliter);
00857           _workers.push_back(worker);
00858           if (worker->_state != WORKER_BROKEN)
00859             {
00860               _activeworkers++;
00861               if (worker->_state != WORKER_LOOKUP)
00862                 {
00863                   worker->nextjob();
00864                 }
00865               else
00866                 _lookupworkers++;
00867             }
00868           ++urliter;
00869           continue;
00870         }
00871       if (!_activeworkers)
00872         {
00873           WAR << "No more active workers!" << endl;
00874           // show the first worker error we find
00875           for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00876             {
00877               if ((*workeriter)->_state != WORKER_BROKEN)
00878                 continue;
00879               ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError));
00880             }
00881           break;
00882         }
00883 
00884       FD_ZERO(&rset);
00885       FD_ZERO(&wset);
00886       FD_ZERO(&xset);
00887 
00888       curl_multi_fdset(_multi, &rset, &wset, &xset, &maxfd);
00889 
00890       if (_lookupworkers)
00891         for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00892           (*workeriter)->adddnsfd(rset, maxfd);
00893 
00894       timeval tv;
00895       // if we added a new job we have to call multi_perform once
00896       // to make it show up in the fd set. do not sleep in this case.
00897       tv.tv_sec = 0;
00898       tv.tv_usec = _havenewjob ? 0 : 200000;
00899       if (_sleepworkers && !_havenewjob)
00900         {
00901           if (_minsleepuntil == 0)
00902             {
00903               for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00904                 {
00905                   multifetchworker *worker = *workeriter;
00906                   if (worker->_state != WORKER_SLEEP)
00907                     continue;
00908                   if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil)
00909                     _minsleepuntil = worker->_sleepuntil;
00910                 }
00911             }
00912           double sl = _minsleepuntil - currentTime();
00913           if (sl < 0)
00914             {
00915               sl = 0;
00916               _minsleepuntil = 0;
00917             }
00918           if (sl < .2)
00919             tv.tv_usec = sl * 1000000;
00920         }
00921       int r = select(maxfd + 1, &rset, &wset, &xset, &tv);
00922       if (r == -1 && errno != EINTR)
00923         ZYPP_THROW(MediaCurlException(_baseurl, "select() failed", "unknown error"));
00924       if (r != 0 && _lookupworkers)
00925         for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00926           {
00927             multifetchworker *worker = *workeriter;
00928             if (worker->_state != WORKER_LOOKUP)
00929               continue;
00930             (*workeriter)->dnsevent(rset);
00931             if (worker->_state != WORKER_LOOKUP)
00932               _lookupworkers--;
00933           }
00934       _havenewjob = false;
00935 
00936       // run curl
00937       for (;;)
00938         {
00939           CURLMcode mcode;
00940           int tasks;
00941           mcode = curl_multi_perform(_multi, &tasks);
00942           if (mcode == CURLM_CALL_MULTI_PERFORM)
00943             continue;
00944           if (mcode != CURLM_OK)
00945             ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_perform", "unknown error"));
00946           break;
00947         }
00948 
00949       double now = currentTime();
00950 
00951       // update periodavg
00952       if (now > _lastperiodstart + .5)
00953         {
00954           if (!_periodavg)
00955             _periodavg = (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart);
00956           else
00957             _periodavg = (_periodavg + (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart)) / 2;
00958           _lastperiodfetched = _fetchedsize;
00959           _lastperiodstart = now;
00960         }
00961 
00962       // wake up sleepers
00963       if (_sleepworkers)
00964         {
00965           for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00966             {
00967               multifetchworker *worker = *workeriter;
00968               if (worker->_state != WORKER_SLEEP)
00969                 continue;
00970               if (worker->_sleepuntil > now)
00971                 continue;
00972               if (_minsleepuntil == worker->_sleepuntil)
00973                 _minsleepuntil = 0;
00974               DBG << "#" << worker->_workerno << ": sleep done, wake up" << endl;
00975               _sleepworkers--;
00976               // nextjob chnages the state
00977               worker->nextjob();
00978             }
00979         }
00980 
00981       // collect all curl results, reschedule new jobs
00982       CURLMsg *msg;
00983       while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0)
00984         {
00985           if (msg->msg != CURLMSG_DONE)
00986             continue;
00987           CURL *easy = msg->easy_handle;
00988           CURLcode cc = msg->data.result;
00989           multifetchworker *worker;
00990           if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK)
00991             ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error"));
00992           if (worker->_blkreceived && now > worker->_blkstarttime)
00993             {
00994               if (worker->_avgspeed)
00995                 worker->_avgspeed = (worker->_avgspeed + worker->_blkreceived / (now - worker->_blkstarttime)) / 2;
00996               else
00997                 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
00998             }
00999           DBG << "#" << worker->_workerno << ": BLK " << worker->_blkno << " done code " << cc << " speed " << worker->_avgspeed << endl;
01000           curl_multi_remove_handle(_multi, easy);
01001           if (cc == CURLE_HTTP_RETURNED_ERROR)
01002             {
01003               long statuscode = 0;
01004               (void)curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &statuscode);
01005               DBG << "HTTP status " << statuscode << endl;
01006               if (statuscode == 416 && !_blklist)       /* Range error */
01007                 {
01008                   if (_filesize == off_t(-1))
01009                     {
01010                       if (!worker->_noendrange)
01011                         {
01012                           DBG << "#" << worker->_workerno << ": retrying with no end range" << endl;
01013                           worker->_noendrange = true;
01014                           worker->run();
01015                           continue;
01016                         }
01017                       worker->_noendrange = false;
01018                       worker->stealjob();
01019                       continue;
01020                     }
01021                   if (worker->_blkstart >= _filesize)
01022                     {
01023                       worker->nextjob();
01024                       continue;
01025                     }
01026                 }
01027             }
01028           if (cc == 0)
01029             {
01030               if (!worker->checkChecksum())
01031                 {
01032                   WAR << "#" << worker->_workerno << ": checksum error, disable worker" << endl;
01033                   worker->_state = WORKER_BROKEN;
01034                   strncpy(worker->_curlError, "checksum error", CURL_ERROR_SIZE);
01035                   _activeworkers--;
01036                   continue;
01037                 }
01038               if (worker->_state == WORKER_FETCH)
01039                 {
01040                   if (worker->_competing)
01041                     {
01042                       worker->disableCompetition();
01043                       // multiple workers wrote into this block. We already know that our
01044                       // data was correct, but maybe some other worker overwrote our data
01045                       // with something broken. Thus we have to re-check the block.
01046                       if (!worker->recheckChecksum())
01047                         {
01048                           DBG << "#" << worker->_workerno << ": recheck checksum error, refetch block" << endl;
01049                           // re-fetch! No need to worry about the bad workers,
01050                           // they will now be set to DISCARD. At the end of their block
01051                           // they will notice that they wrote bad data and go into BROKEN.
01052                           worker->run();
01053                           continue;
01054                         }
01055                     }
01056                   _fetchedgoodsize += worker->_blksize;
01057                 }
01058 
01059               // make bad workers sleep a little
01060               double maxavg = 0;
01061               int maxworkerno = 0;
01062               int numbetter = 0;
01063               for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
01064                 {
01065                   multifetchworker *oworker = *workeriter;
01066                   if (oworker->_state == WORKER_BROKEN)
01067                     continue;
01068                   if (oworker->_avgspeed > maxavg)
01069                     {
01070                       maxavg = oworker->_avgspeed;
01071                       maxworkerno = oworker->_workerno;
01072                     }
01073                   if (oworker->_avgspeed > worker->_avgspeed)
01074                     numbetter++;
01075                 }
01076               if (maxavg && !_stealing)
01077                 {
01078                   double ratio = worker->_avgspeed / maxavg;
01079                   ratio = 1 - ratio;
01080                   if (numbetter < 3)    // don't sleep that much if we're in the top two
01081                     ratio = ratio * ratio;
01082                   if (ratio > .01)
01083                     {
01084                       DBG << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl;
01085                       worker->_sleepuntil = now + ratio;
01086                       worker->_state = WORKER_SLEEP;
01087                       _sleepworkers++;
01088                       continue;
01089                     }
01090                 }
01091 
01092               // do rate control (if requested)
01093               // should use periodavg, but that's not what libcurl does
01094               if (_maxspeed && now > _starttime)
01095                 {
01096                   double avg = _fetchedsize / (now - _starttime);
01097                   avg = worker->_maxspeed * _maxspeed / avg;
01098                   if (avg < _maxspeed / _maxworkers)
01099                     avg = _maxspeed / _maxworkers;
01100                   if (avg > _maxspeed)
01101                     avg = _maxspeed;
01102                   if (avg < 1024)
01103                     avg = 1024;
01104                   worker->_maxspeed = avg;
01105 #if CURLVERSION_AT_LEAST(7,15,5)
01106                   curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg));
01107 #endif
01108                 }
01109 
01110               worker->nextjob();
01111             }
01112           else
01113             {
01114               worker->_state = WORKER_BROKEN;
01115               _activeworkers--;
01116               if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS))
01117                 {
01118                   // end of workers reached! goodbye!
01119                   worker->evaluateCurlCode(Pathname(), cc, false);
01120                 }
01121             }
01122         }
01123 
01124       // send report
01125       if (_report)
01126         {
01127           int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0;
01128           double avg = 0;
01129           if (now > _starttime)
01130             avg = _fetchedsize / (now - _starttime);
01131           if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg))
01132             ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled"));
01133         }
01134 
01135       if (_timeout && now - _lastprogress > _timeout)
01136         break;
01137     }
01138 
01139   if (!_finished)
01140     ZYPP_THROW(MediaTimeoutException(_baseurl));
01141 
01142   // print some download stats
01143   WAR << "overall result" << endl;
01144   for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
01145     {
01146       multifetchworker *worker = *workeriter;
01147       WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl;
01148     }
01149 }
01150 
01151 
01153 
01154 
01155 MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r)
01156     : MediaCurl(url_r, attach_point_hint_r)
01157 {
01158   MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl;
01159   _multi = 0;
01160   _customHeadersMetalink = 0;
01161 }
01162 
01163 MediaMultiCurl::~MediaMultiCurl()
01164 {
01165   if (_customHeadersMetalink)
01166     {
01167       curl_slist_free_all(_customHeadersMetalink);
01168       _customHeadersMetalink = 0;
01169     }
01170   if (_multi)
01171     {
01172       curl_multi_cleanup(_multi);
01173       _multi = 0;
01174     }
01175   std::map<std::string, CURL *>::iterator it;
01176   for (it = _easypool.begin(); it != _easypool.end(); it++)
01177     {
01178       CURL *easy = it->second;
01179       if (easy)
01180         {
01181           curl_easy_cleanup(easy);
01182           it->second = NULL;
01183         }
01184     }
01185 }
01186 
01187 void MediaMultiCurl::setupEasy()
01188 {
01189   MediaCurl::setupEasy();
01190 
01191   if (_customHeadersMetalink)
01192     {
01193       curl_slist_free_all(_customHeadersMetalink);
01194       _customHeadersMetalink = 0;
01195     }
01196   struct curl_slist *sl = _customHeaders;
01197   for (; sl; sl = sl->next)
01198     _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data);
01199   _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/metalink+xml, application/metalink4+xml");
01200 }
01201 
01202 static bool looks_like_metalink(const Pathname & file)
01203 {
01204   char buf[256], *p;
01205   int fd, l;
01206   if ((fd = open(file.asString().c_str(), O_RDONLY|O_CLOEXEC)) == -1)
01207     return false;
01208   while ((l = read(fd, buf, sizeof(buf) - 1)) == -1 && errno == EINTR)
01209     ;
01210   close(fd);
01211   if (l == -1)
01212     return 0;
01213   buf[l] = 0;
01214   p = buf;
01215   while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
01216     p++;
01217   if (!strncasecmp(p, "<?xml", 5))
01218     {
01219       while (*p && *p != '>')
01220         p++;
01221       if (*p == '>')
01222         p++;
01223       while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
01224         p++;
01225     }
01226   bool ret = !strncasecmp(p, "<metalink", 9) ? true : false;
01227   DBG << "looks_like_metalink(" << file << "): " << ret << endl;
01228   return ret;
01229 }
01230 
01231 void MediaMultiCurl::doGetFileCopy( const Pathname & filename , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const
01232 {
01233   Pathname dest = target.absolutename();
01234   if( assert_dir( dest.dirname() ) )
01235   {
01236     DBG << "assert_dir " << dest.dirname() << " failed" << endl;
01237     Url url(getFileUrl(filename));
01238     ZYPP_THROW( MediaSystemException(url, "System error on " + dest.dirname().asString()) );
01239   }
01240   string destNew = target.asString() + ".new.zypp.XXXXXX";
01241   char *buf = ::strdup( destNew.c_str());
01242   if( !buf)
01243   {
01244     ERR << "out of memory for temp file name" << endl;
01245     Url url(getFileUrl(filename));
01246     ZYPP_THROW(MediaSystemException(url, "out of memory for temp file name"));
01247   }
01248 
01249   int tmp_fd = ::mkostemp( buf, O_CLOEXEC );
01250   if( tmp_fd == -1)
01251   {
01252     free( buf);
01253     ERR << "mkstemp failed for file '" << destNew << "'" << endl;
01254     ZYPP_THROW(MediaWriteException(destNew));
01255   }
01256   destNew = buf;
01257   free( buf);
01258 
01259   FILE *file = ::fdopen( tmp_fd, "we" );
01260   if ( !file ) {
01261     ::close( tmp_fd);
01262     filesystem::unlink( destNew );
01263     ERR << "fopen failed for file '" << destNew << "'" << endl;
01264     ZYPP_THROW(MediaWriteException(destNew));
01265   }
01266   DBG << "dest: " << dest << endl;
01267   DBG << "temp: " << destNew << endl;
01268 
01269   // set IFMODSINCE time condition (no download if not modified)
01270   if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) )
01271   {
01272     curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);
01273     curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime());
01274   }
01275   else
01276   {
01277     curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01278     curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01279   }
01280   // change header to include Accept: metalink
01281   curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink);
01282   try
01283     {
01284       MediaCurl::doGetFileCopyFile(filename, dest, file, report, options);
01285     }
01286   catch (Exception &ex)
01287     {
01288       ::fclose(file);
01289       filesystem::unlink(destNew);
01290       curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01291       curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01292       curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
01293       ZYPP_RETHROW(ex);
01294     }
01295   curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01296   curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01297   curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
01298   long httpReturnCode = 0;
01299   CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode);
01300   if (infoRet == CURLE_OK)
01301   {
01302     DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl;
01303     if ( httpReturnCode == 304
01304          || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified
01305     {
01306       DBG << "not modified: " << PathInfo(dest) << endl;
01307       return;
01308     }
01309   }
01310   else
01311   {
01312     WAR << "Could not get the reponse code." << endl;
01313   }
01314 
01315   bool ismetalink = false;
01316 
01317   char *ptr = NULL;
01318   if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
01319     {
01320       string ct = string(ptr);
01321       if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
01322         ismetalink = true;
01323     }
01324 
01325   if (!ismetalink)
01326     {
01327       // some proxies do not store the content type, so also look at the file to find
01328       // out if we received a metalink (bnc#649925)
01329       fflush(file);
01330       if (looks_like_metalink(Pathname(destNew)))
01331         ismetalink = true;
01332     }
01333 
01334   if (ismetalink)
01335     {
01336       bool userabort = false;
01337       fclose(file);
01338       file = NULL;
01339       Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed";
01340       try
01341         {
01342           MetaLinkParser mlp;
01343           mlp.parse(Pathname(destNew));
01344           MediaBlockList bl = mlp.getBlockList();
01345           vector<Url> urls = mlp.getUrls();
01346           DBG << bl << endl;
01347           file = fopen(destNew.c_str(), "w+e");
01348           if (!file)
01349             ZYPP_THROW(MediaWriteException(destNew));
01350           if (PathInfo(target).isExist())
01351             {
01352               DBG << "reusing blocks from file " << target << endl;
01353               bl.reuseBlocks(file, target.asString());
01354               DBG << bl << endl;
01355             }
01356           if (bl.haveChecksum(1) && PathInfo(failedFile).isExist())
01357             {
01358               DBG << "reusing blocks from file " << failedFile << endl;
01359               bl.reuseBlocks(file, failedFile.asString());
01360               DBG << bl << endl;
01361               filesystem::unlink(failedFile);
01362             }
01363           Pathname df = deltafile();
01364           if (!df.empty())
01365             {
01366               DBG << "reusing blocks from file " << df << endl;
01367               bl.reuseBlocks(file, df.asString());
01368               DBG << bl << endl;
01369             }
01370           try
01371             {
01372               multifetch(filename, file, &urls, &report, &bl);
01373             }
01374           catch (MediaCurlException &ex)
01375             {
01376               userabort = ex.errstr() == "User abort";
01377               ZYPP_RETHROW(ex);
01378             }
01379         }
01380       catch (Exception &ex)
01381         {
01382           // something went wrong. fall back to normal download
01383           if (file)
01384             fclose(file);
01385           file = NULL;
01386           if (PathInfo(destNew).size() >= 63336)
01387             {
01388               ::unlink(failedFile.asString().c_str());
01389               filesystem::hardlinkCopy(destNew, failedFile);
01390             }
01391           if (userabort)
01392             {
01393               filesystem::unlink(destNew);
01394               ZYPP_RETHROW(ex);
01395             }
01396           file = fopen(destNew.c_str(), "w+e");
01397           if (!file)
01398             ZYPP_THROW(MediaWriteException(destNew));
01399           MediaCurl::doGetFileCopyFile(filename, dest, file, report, options | OPTION_NO_REPORT_START);
01400         }
01401     }
01402 
01403   if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 )))
01404     {
01405       ERR << "Failed to chmod file " << destNew << endl;
01406     }
01407   if (::fclose(file))
01408     {
01409       filesystem::unlink(destNew);
01410       ERR << "Fclose failed for file '" << destNew << "'" << endl;
01411       ZYPP_THROW(MediaWriteException(destNew));
01412     }
01413   if ( rename( destNew, dest ) != 0 )
01414     {
01415       ERR << "Rename failed" << endl;
01416       ZYPP_THROW(MediaWriteException(dest));
01417     }
01418   DBG << "done: " << PathInfo(dest) << endl;
01419 }
01420 
01421 void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) const
01422 {
01423   Url baseurl(getFileUrl(filename));
01424   if (blklist && filesize == off_t(-1) && blklist->haveFilesize())
01425     filesize = blklist->getFilesize();
01426   if (blklist && !blklist->haveBlocks() && filesize != 0)
01427     blklist = 0;
01428   if (blklist && (filesize == 0 || !blklist->numBlocks()))
01429     {
01430       checkFileDigest(baseurl, fp, blklist);
01431       return;
01432     }
01433   if (filesize == 0)
01434     return;
01435   if (!_multi)
01436     {
01437       _multi = curl_multi_init();
01438       if (!_multi)
01439         ZYPP_THROW(MediaCurlInitException(baseurl));
01440     }
01441   multifetchrequest req(this, filename, baseurl, _multi, fp, report, blklist, filesize);
01442   req._timeout = _settings.timeout();
01443   req._connect_timeout = _settings.connectTimeout();
01444   req._maxspeed = _settings.maxDownloadSpeed();
01445   req._maxworkers = _settings.maxConcurrentConnections();
01446   if (req._maxworkers > MAXURLS)
01447     req._maxworkers = MAXURLS;
01448   if (req._maxworkers <= 0)
01449     req._maxworkers = 1;
01450   std::vector<Url> myurllist;
01451   for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter)
01452     {
01453       try
01454         {
01455           string scheme = urliter->getScheme();
01456           if (scheme == "http" || scheme == "https" || scheme == "ftp")
01457             {
01458               checkProtocol(*urliter);
01459               myurllist.push_back(*urliter);
01460             }
01461         }
01462       catch (...)
01463         {
01464         }
01465     }
01466   if (!myurllist.size())
01467     myurllist.push_back(baseurl);
01468   req.run(myurllist);
01469   checkFileDigest(baseurl, fp, blklist);
01470 }
01471 
01472 void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList *blklist) const
01473 {
01474   if (!blklist || !blklist->haveFileChecksum())
01475     return;
01476   if (fseeko(fp, off_t(0), SEEK_SET))
01477     ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error"));
01478   Digest dig;
01479   blklist->createFileDigest(dig);
01480   char buf[4096];
01481   size_t l;
01482   while ((l = fread(buf, 1, sizeof(buf), fp)) > 0)
01483     dig.update(buf, l);
01484   if (!blklist->verifyFileDigest(dig))
01485     ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error"));
01486 }
01487 
01488 bool MediaMultiCurl::isDNSok(const string &host) const
01489 {
01490   return _dnsok.find(host) == _dnsok.end() ? false : true;
01491 }
01492 
01493 void MediaMultiCurl::setDNSok(const string &host) const
01494 {
01495   _dnsok.insert(host);
01496 }
01497 
01498 CURL *MediaMultiCurl::fromEasyPool(const string &host) const
01499 {
01500   if (_easypool.find(host) == _easypool.end())
01501     return 0;
01502   CURL *ret = _easypool[host];
01503   _easypool.erase(host);
01504   return ret;
01505 }
01506 
01507 void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const
01508 {
01509   CURL *oldeasy = _easypool[host];
01510   _easypool[host] = easy;
01511   if (oldeasy)
01512     curl_easy_cleanup(oldeasy);
01513 }
01514 
01515   } // namespace media
01516 } // namespace zypp
01517