libzypp 8.13.6

MediaMultiCurl.cc

Go to the documentation of this file.
00001 /*---------------------------------------------------------------------\
00002 |                          ____ _   __ __ ___                          |
00003 |                         |__  / \ / / . \ . \                         |
00004 |                           / / \ V /|  _/  _/                         |
00005 |                          / /__ | | | | | |                           |
00006 |                         /_____||_| |_| |_|                           |
00007 |                                                                      |
00008 \---------------------------------------------------------------------*/
00013 #include <ctype.h>
00014 #include <sys/types.h>
00015 #include <signal.h>
00016 #include <sys/wait.h>
00017 #include <netdb.h>
00018 #include <arpa/inet.h>
00019 
00020 #include <vector>
00021 #include <iostream>
00022 #include <algorithm>
00023 
00024 
00025 #include "zypp/ZConfig.h"
00026 #include "zypp/base/Logger.h"
00027 #include "zypp/media/MediaMultiCurl.h"
00028 #include "zypp/media/MetaLinkParser.h"
00029 
00030 using namespace std;
00031 using namespace zypp::base;
00032 
00033 namespace zypp {
00034   namespace media {
00035 
00036 
00038 
00039 
00040 class multifetchrequest;
00041 
00042 // Hack: we derive from MediaCurl just to get the storage space for
00043 // settings, url, curlerrors and the like
00044 
00045 class multifetchworker : MediaCurl {
00046   friend class multifetchrequest;
00047 
00048 public:
00049   multifetchworker(int no, multifetchrequest &request, const Url &url);
00050   ~multifetchworker();
00051   void nextjob();
00052   void run();
00053   bool checkChecksum();
00054   bool recheckChecksum();
00055   void disableCompetition();
00056 
00057   void checkdns();
00058   void adddnsfd(fd_set &rset, int &maxfd);
00059   void dnsevent(fd_set &rset);
00060 
00061   int _workerno;
00062 
00063   int _state;
00064   bool _competing;
00065 
00066   size_t _blkno;
00067   off_t _blkstart;
00068   size_t _blksize;
00069   bool _noendrange;
00070 
00071   double _blkstarttime;
00072   size_t _blkreceived;
00073   off_t  _received;
00074 
00075   double _avgspeed;
00076   double _maxspeed;
00077 
00078   double _sleepuntil;
00079 
00080 private:
00081   void stealjob();
00082 
00083   size_t writefunction(void *ptr, size_t size);
00084   static size_t _writefunction(void *ptr, size_t size, size_t nmemb, void *stream);
00085 
00086   size_t headerfunction(char *ptr, size_t size);
00087   static size_t _headerfunction(void *ptr, size_t size, size_t nmemb, void *stream);
00088 
00089   multifetchrequest *_request;
00090   int _pass;
00091   string _urlbuf;
00092   off_t _off;
00093   size_t _size;
00094   Digest _dig;
00095 
00096   pid_t _pid;
00097   int _dnspipe;
00098 };
00099 
00100 #define WORKER_STARTING 0
00101 #define WORKER_LOOKUP   1
00102 #define WORKER_FETCH    2
00103 #define WORKER_DISCARD  3
00104 #define WORKER_DONE     4
00105 #define WORKER_SLEEP    5
00106 #define WORKER_BROKEN   6
00107 
00108 
00109 
00110 class multifetchrequest {
00111 public:
00112   multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize);
00113   ~multifetchrequest();
00114 
00115   void run(std::vector<Url> &urllist);
00116 
00117 protected:
00118   friend class multifetchworker;
00119 
00120   const MediaMultiCurl *_context;
00121   const Pathname _filename;
00122   Url _baseurl;
00123 
00124   FILE *_fp;
00125   callback::SendReport<DownloadProgressReport> *_report;
00126   MediaBlockList *_blklist;
00127   off_t _filesize;
00128 
00129   CURLM *_multi;
00130 
00131   std::list<multifetchworker *> _workers;
00132   bool _stealing;
00133   bool _havenewjob;
00134 
00135   size_t _blkno;
00136   off_t _blkoff;
00137   size_t _activeworkers;
00138   size_t _lookupworkers;
00139   size_t _sleepworkers;
00140   double _minsleepuntil;
00141   bool _finished;
00142   off_t _totalsize;
00143   off_t _fetchedsize;
00144   off_t _fetchedgoodsize;
00145 
00146   double _starttime;
00147   double _lastprogress;
00148 
00149   double _lastperiodstart;
00150   double _lastperiodfetched;
00151   double _periodavg;
00152 
00153 public:
00154   double _timeout;
00155   double _connect_timeout;
00156   double _maxspeed;
00157   int _maxworkers;
00158 };
00159 
00160 #define BLKSIZE         131072
00161 #define MAXURLS         10
00162 
00163 
00165 
00166 static double
00167 currentTime()
00168 {
00169   struct timeval tv;
00170   if (gettimeofday(&tv, NULL))
00171     return 0;
00172   return tv.tv_sec + tv.tv_usec / 1000000.;
00173 }
00174 
00175 size_t
00176 multifetchworker::writefunction(void *ptr, size_t size)
00177 {
00178   size_t len, cnt;
00179   if (_state == WORKER_BROKEN)
00180     return size ? 0 : 1;
00181 
00182   double now = currentTime();
00183 
00184   len = size > _size ? _size : size;
00185   if (!len)
00186     {
00187       // kill this job?
00188       return size;
00189     }
00190 
00191   if (_blkstart && _off == _blkstart)
00192     {
00193       // make sure that the server replied with "partial content"
00194       // for http requests
00195       char *effurl;
00196       (void)curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &effurl);
00197       if (effurl && !strncasecmp(effurl, "http", 4))
00198         {
00199           long statuscode = 0;
00200           (void)curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &statuscode);
00201           if (statuscode != 206)
00202             return size ? 0 : 1;
00203         }
00204     }
00205 
00206   _blkreceived += len;
00207   _received += len;
00208 
00209   _request->_lastprogress = now;
00210     
00211   if (_state == WORKER_DISCARD || !_request->_fp)
00212     {
00213       // block is no longer needed
00214       // still calculate the checksum so that we can throw out bad servers
00215       if (_request->_blklist)
00216         _dig.update((const char *)ptr, len);
00217       _off += len;
00218       _size -= len;
00219       return size;
00220     }
00221   if (fseeko(_request->_fp, _off, SEEK_SET))
00222     return size ? 0 : 1;
00223   cnt = fwrite(ptr, 1, len, _request->_fp);
00224   if (cnt > 0)
00225     {
00226       _request->_fetchedsize += cnt;
00227       if (_request->_blklist)
00228         _dig.update((const char *)ptr, cnt);
00229       _off += cnt;
00230       _size -= cnt;
00231       if (cnt == len)
00232         return size;
00233     }
00234   return cnt;
00235 }
00236 
00237 size_t
00238 multifetchworker::_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
00239 {
00240   multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
00241   return me->writefunction(ptr, size * nmemb);
00242 }
00243 
00244 size_t
00245 multifetchworker::headerfunction(char *p, size_t size)
00246 {
00247   size_t l = size;
00248   if (l > 9 && !strncasecmp(p, "Location:", 9))
00249     {
00250       string line(p + 9, l - 9);
00251       if (line[l - 10] == '\r')
00252         line.erase(l - 10, 1);
00253       DBG << "#" << _workerno << ": redirecting to" << line << endl;
00254       return size;
00255     }
00256   if (l <= 14 || l >= 128 || strncasecmp(p, "Content-Range:", 14) != 0)
00257     return size;
00258   p += 14; 
00259   l -= 14; 
00260   while (l && (*p == ' ' || *p == '\t'))
00261     p++, l--;
00262   if (l < 6 || strncasecmp(p, "bytes", 5)) 
00263     return size;
00264   p += 5;
00265   l -= 5;
00266   char buf[128];
00267   memcpy(buf, p, l); 
00268   buf[l] = 0;
00269   unsigned long long start, off, filesize;
00270   if (sscanf(buf, "%llu-%llu/%llu", &start, &off, &filesize) != 3)
00271     return size;
00272   if (_request->_filesize == (off_t)-1)
00273     {
00274       WAR << "#" << _workerno << ": setting request filesize to " << filesize << endl;
00275       _request->_filesize = filesize;
00276       if (_request->_totalsize == 0 && !_request->_blklist)
00277         _request->_totalsize = filesize;
00278     }
00279   if (_request->_filesize != (off_t)filesize)
00280     {
00281       DBG << "#" << _workerno << ": filesize mismatch" << endl;
00282       _state = WORKER_BROKEN;
00283       strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE);
00284     }
00285   return size;
00286 }
00287 
00288 size_t
00289 multifetchworker::_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
00290 {
00291   multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
00292   return me->headerfunction((char *)ptr, size * nmemb);
00293 }
00294 
00295 multifetchworker::multifetchworker(int no, multifetchrequest &request, const Url &url)
00296 : MediaCurl(url, Pathname())
00297 {
00298   _workerno = no;
00299   _request = &request;
00300   _state = WORKER_STARTING;
00301   _competing = false;
00302   _off = _blkstart = 0;
00303   _size = _blksize = 0;
00304   _pass = 0;
00305   _blkno = 0;
00306   _pid = 0;
00307   _dnspipe = -1;
00308   _blkreceived = 0;
00309   _received = 0;
00310   _blkstarttime = 0;
00311   _avgspeed = 0;
00312   _sleepuntil = 0;
00313   _maxspeed = _request->_maxspeed;
00314   _noendrange = false;
00315 
00316   Url curlUrl( clearQueryString(url) );
00317   _urlbuf = curlUrl.asString();
00318   _curl = _request->_context->fromEasyPool(_url.getHost());
00319   if (_curl)
00320     DBG << "reused worker from pool" << endl;
00321   if (!_curl && !(_curl = curl_easy_init()))
00322     {
00323       _state = WORKER_BROKEN;
00324       strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE);
00325       return;
00326     }
00327   try
00328     {
00329       setupEasy();
00330     }
00331   catch (Exception &ex)
00332     {
00333       curl_easy_cleanup(_curl);
00334       _curl = 0;
00335       _state = WORKER_BROKEN;
00336       strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE);
00337       return;
00338     }
00339   curl_easy_setopt(_curl, CURLOPT_PRIVATE, this);
00340   curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str());
00341   curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, &_writefunction);
00342   curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this);
00343   if (_request->_filesize == off_t(-1) || !_request->_blklist || !_request->_blklist->haveChecksum(0))
00344     {
00345       curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, &_headerfunction);
00346       curl_easy_setopt(_curl, CURLOPT_HEADERDATA, this);
00347     }
00348   // if this is the same host copy authorization
00349   // (the host check is also what curl does when doing a redirect)
00350   // (note also that unauthorized exceptions are thrown with the request host)
00351   if (url.getHost() == _request->_context->_url.getHost())
00352     {
00353       _settings.setUsername(_request->_context->_settings.username());
00354       _settings.setPassword(_request->_context->_settings.password());
00355       _settings.setAuthType(_request->_context->_settings.authType());
00356       if ( _settings.userPassword().size() )
00357         {
00358           curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str());
00359           string use_auth = _settings.authType();
00360           if (use_auth.empty())
00361             use_auth = "digest,basic";        // our default
00362           long auth = CurlAuthData::auth_type_str2long(use_auth);
00363           if( auth != CURLAUTH_NONE)
00364           {    
00365             DBG << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth
00366                 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl;
00367             curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth);
00368           }
00369         }
00370     }
00371   checkdns();
00372 }
00373 
00374 multifetchworker::~multifetchworker()
00375 {
00376   if (_curl)
00377     {
00378       if (_state == WORKER_FETCH || _state == WORKER_DISCARD)
00379         curl_multi_remove_handle(_request->_multi, _curl);
00380       if (_state == WORKER_DONE || _state == WORKER_SLEEP)
00381         {
00382           curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0);
00383           curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
00384           curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0);
00385           curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0);
00386           curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0);
00387           curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0);
00388           _request->_context->toEasyPool(_url.getHost(), _curl);
00389         }
00390       else
00391         curl_easy_cleanup(_curl);
00392       _curl = 0;
00393     }
00394   if (_pid)
00395     {
00396       kill(_pid, SIGKILL);
00397       int status;
00398       while (waitpid(_pid, &status, 0) == -1)
00399         if (errno != EINTR)
00400           break;
00401       _pid = 0;
00402     }
00403   if (_dnspipe != -1)
00404     {
00405       close(_dnspipe);
00406       _dnspipe = -1;
00407     }
00408   // the destructor in MediaCurl doesn't call disconnect() if
00409   // the media is not attached, so we do it here manually
00410   disconnectFrom();
00411 }
00412 
00413 static inline bool env_isset(string name)
00414 {
00415   const char *s = getenv(name.c_str());
00416   return s && *s ? true : false;
00417 }
00418 
00419 void
00420 multifetchworker::checkdns()
00421 {
00422   string host = _url.getHost();
00423 
00424   if (host.empty())
00425     return;
00426 
00427   if (_request->_context->isDNSok(host))
00428     return;
00429 
00430   // no need to do dns checking for numeric hosts
00431   char addrbuf[128];
00432   if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1)
00433     return;
00434   if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1)
00435     return;
00436 
00437   // no need to do dns checking if we use a proxy
00438   if (!_settings.proxy().empty())
00439     return;
00440   if (env_isset("all_proxy") || env_isset("ALL_PROXY"))
00441     return;
00442   string schemeproxy = _url.getScheme() + "_proxy";
00443   if (env_isset(schemeproxy))
00444     return;
00445   if (schemeproxy != "http_proxy")
00446     {
00447       std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper);
00448       if (env_isset(schemeproxy))
00449         return;
00450     }
00451 
00452   DBG << "checking DNS lookup of " << host << endl;
00453   int pipefds[2];
00454   if (pipe(pipefds))
00455     {
00456       _state = WORKER_BROKEN;
00457       strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE);
00458       return;
00459     }
00460   _pid = fork();
00461   if (_pid == pid_t(-1))
00462     {
00463       close(pipefds[0]);
00464       close(pipefds[1]);
00465       _pid = 0;
00466       _state = WORKER_BROKEN;
00467       strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE);
00468       return;
00469     }
00470   else if (_pid == 0)
00471     {
00472       close(pipefds[0]);
00473       // XXX: close all other file descriptors
00474       struct addrinfo *ai, aihints;
00475       memset(&aihints, 0, sizeof(aihints));
00476       aihints.ai_family = PF_UNSPEC;
00477       int tstsock = socket(PF_INET6, SOCK_DGRAM, 0);
00478       if (tstsock == -1)
00479         aihints.ai_family = PF_INET;
00480       else
00481         close(tstsock);
00482       aihints.ai_socktype = SOCK_STREAM;
00483       aihints.ai_flags = AI_CANONNAME;
00484       unsigned int connecttimeout = _request->_connect_timeout;
00485       if (connecttimeout)
00486         alarm(connecttimeout);
00487       signal(SIGALRM, SIG_DFL);
00488       if (getaddrinfo(host.c_str(), NULL, &aihints, &ai))
00489         _exit(1);
00490       _exit(0);
00491     }
00492   close(pipefds[1]);
00493   _dnspipe = pipefds[0];
00494   _state = WORKER_LOOKUP;
00495 }
00496 
00497 void
00498 multifetchworker::adddnsfd(fd_set &rset, int &maxfd)
00499 {
00500   if (_state != WORKER_LOOKUP)
00501     return;
00502   FD_SET(_dnspipe, &rset);
00503   if (maxfd < _dnspipe)
00504     maxfd = _dnspipe;
00505 }
00506 
00507 void
00508 multifetchworker::dnsevent(fd_set &rset)
00509 {
00510   
00511   if (_state != WORKER_LOOKUP || !FD_ISSET(_dnspipe, &rset))
00512     return;
00513   int status;
00514   while (waitpid(_pid, &status, 0) == -1)
00515     {
00516       if (errno != EINTR)
00517         return;
00518     }
00519   _pid = 0;
00520   if (_dnspipe != -1)
00521     {
00522       close(_dnspipe);
00523       _dnspipe = -1;
00524     }
00525   if (!WIFEXITED(status))
00526     {
00527       _state = WORKER_BROKEN;
00528       strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
00529       _request->_activeworkers--;
00530       return;
00531     }
00532   int exitcode = WEXITSTATUS(status);
00533   DBG << "#" << _workerno << ": DNS lookup returned " << exitcode << endl;
00534   if (exitcode != 0)
00535     {
00536       _state = WORKER_BROKEN;
00537       strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
00538       _request->_activeworkers--;
00539       return;
00540     }
00541   _request->_context->setDNSok(_url.getHost());
00542   nextjob();
00543 }
00544 
00545 bool
00546 multifetchworker::checkChecksum()
00547 {
00548   // DBG << "checkChecksum block " << _blkno << endl;
00549   if (!_blksize || !_request->_blklist)
00550     return true;
00551   return _request->_blklist->verifyDigest(_blkno, _dig);
00552 }
00553 
00554 bool
00555 multifetchworker::recheckChecksum()
00556 {
00557   // DBG << "recheckChecksum block " << _blkno << endl;
00558   if (!_request->_fp || !_blksize || !_request->_blklist)
00559     return true;
00560   if (fseeko(_request->_fp, _blkstart, SEEK_SET))
00561     return false;
00562   char buf[4096];
00563   size_t l = _blksize;
00564   _request->_blklist->createDigest(_dig);       // resets digest
00565   while (l)
00566     {
00567       size_t cnt = l > sizeof(buf) ? sizeof(buf) : l;
00568       if (fread(buf, cnt, 1, _request->_fp) != 1)
00569         return false;
00570       _dig.update(buf, cnt);
00571       l -= cnt;
00572     }
00573   return _request->_blklist->verifyDigest(_blkno, _dig);
00574 }
00575 
00576 
00577 void
00578 multifetchworker::stealjob()
00579 {
00580   if (!_request->_stealing)
00581     {
00582       DBG << "start stealing!" << endl;
00583       _request->_stealing = true;
00584     }
00585   multifetchworker *best = 0;
00586   std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
00587   double now = 0;
00588   for (; workeriter != _request->_workers.end(); ++workeriter)
00589     {
00590       multifetchworker *worker = *workeriter;
00591       if (worker == this)
00592         continue;
00593       if (worker->_pass == -1)
00594         continue;       // do not steal!
00595       if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_blksize)
00596         continue;       // do not steal finished jobs
00597       if (!worker->_avgspeed && worker->_blkreceived)
00598         {
00599           if (!now)
00600             now = currentTime();
00601           if (now > worker->_blkstarttime)
00602             worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
00603         }
00604       if (!best || best->_pass > worker->_pass)
00605         {
00606           best = worker;
00607           continue;
00608         }
00609       if (best->_pass < worker->_pass)
00610         continue;
00611       // if it is the same block, we want to know the best worker, otherwise the worst
00612       if (worker->_blkstart == best->_blkstart)
00613         {
00614           if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed < (best->_blksize - best->_blkreceived) * worker->_avgspeed)
00615             best = worker;
00616         }
00617       else
00618         {
00619           if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed > (best->_blksize - best->_blkreceived) * worker->_avgspeed)
00620             best = worker;
00621         }
00622     }
00623   if (!best)
00624     {
00625       _state = WORKER_DONE;
00626       _request->_activeworkers--;
00627       _request->_finished = true;
00628       return;
00629     }
00630   // do not sleep twice
00631   if (_state != WORKER_SLEEP)
00632     {
00633       if (!_avgspeed && _blkreceived)
00634         {
00635           if (!now)
00636             now = currentTime();
00637           if (now > _blkstarttime)
00638             _avgspeed = _blkreceived / (now - _blkstarttime);
00639         }
00640 
00641       // lets see if we should sleep a bit
00642       DBG << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_blksize << endl;
00643       DBG << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_blksize - best->_blkreceived) << endl;
00644       if (_avgspeed && best->_avgspeed && best->_blksize - best->_blkreceived > 0 &&
00645           (best->_blksize - best->_blkreceived) * _avgspeed < best->_blksize * best->_avgspeed)
00646         {
00647           if (!now)
00648             now = currentTime();
00649           double sl = (best->_blksize - best->_blkreceived) / best->_avgspeed * 2;
00650           if (sl > 1)
00651             sl = 1;
00652           DBG << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl;
00653           _sleepuntil = now + sl;
00654           _state = WORKER_SLEEP;
00655           _request->_sleepworkers++;
00656           return;
00657         }
00658     }
00659 
00660   _competing = true;
00661   best->_competing = true;
00662   _blkstart = best->_blkstart;
00663   _blksize = best->_blksize;
00664   best->_pass++;
00665   _pass = best->_pass;
00666   _blkno = best->_blkno;
00667   run();
00668 }
00669 
00670 void
00671 multifetchworker::disableCompetition()
00672 {
00673   std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
00674   for (; workeriter != _request->_workers.end(); ++workeriter)
00675     {
00676       multifetchworker *worker = *workeriter;
00677       if (worker == this)
00678         continue;
00679       if (worker->_blkstart == _blkstart)
00680         {
00681           if (worker->_state == WORKER_FETCH)
00682             worker->_state = WORKER_DISCARD;
00683           worker->_pass = -1;   /* do not steal this one, we already have it */
00684         }
00685     }
00686 }
00687 
00688 
00689 void
00690 multifetchworker::nextjob()
00691 {
00692   _noendrange = false;
00693   if (_request->_stealing)
00694     {
00695       stealjob();
00696       return;
00697     }
00698   
00699   MediaBlockList *blklist = _request->_blklist;
00700   if (!blklist)
00701     {
00702       _blksize = BLKSIZE;
00703       if (_request->_filesize != off_t(-1))
00704         {
00705           if (_request->_blkoff >= _request->_filesize)
00706             {
00707               stealjob();
00708               return;
00709             }
00710           _blksize = _request->_filesize - _request->_blkoff;
00711           if (_blksize > BLKSIZE)
00712             _blksize = BLKSIZE;
00713         }
00714     }
00715   else
00716     {
00717       MediaBlock blk = blklist->getBlock(_request->_blkno);
00718       while (_request->_blkoff >= blk.off + blk.size)
00719         {
00720           if (++_request->_blkno == blklist->numBlocks())
00721             {
00722               stealjob();
00723               return;
00724             }
00725           blk = blklist->getBlock(_request->_blkno);
00726           _request->_blkoff = blk.off;
00727         }
00728       _blksize = blk.off + blk.size - _request->_blkoff;
00729       if (_blksize > BLKSIZE && !blklist->haveChecksum(_request->_blkno))
00730         _blksize = BLKSIZE;
00731     }
00732   _blkno = _request->_blkno;
00733   _blkstart = _request->_blkoff;
00734   _request->_blkoff += _blksize;
00735   run();
00736 }
00737 
00738 void
00739 multifetchworker::run()
00740 {
00741   char rangebuf[128];
00742 
00743   if (_state == WORKER_BROKEN || _state == WORKER_DONE)
00744      return;    // just in case...
00745   if (_noendrange)
00746     sprintf(rangebuf, "%llu-", (unsigned long long)_blkstart);
00747   else
00748     sprintf(rangebuf, "%llu-%llu", (unsigned long long)_blkstart, (unsigned long long)_blkstart + _blksize - 1);
00749   DBG << "#" << _workerno << ": BLK " << _blkno << ":" << rangebuf << " " << _url << endl;
00750   if (curl_easy_setopt(_curl, CURLOPT_RANGE, !_noendrange || _blkstart != 0 ? rangebuf : (char *)0) != CURLE_OK)
00751     {
00752       _request->_activeworkers--;
00753       _state = WORKER_BROKEN;
00754       strncpy(_curlError, "curl_easy_setopt range failed", CURL_ERROR_SIZE);
00755       return;
00756     }
00757   if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK)
00758     {
00759       _request->_activeworkers--;
00760       _state = WORKER_BROKEN;
00761       strncpy(_curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE);
00762       return;
00763     }
00764   _request->_havenewjob = true;
00765   _off = _blkstart;
00766   _size = _blksize;
00767   if (_request->_blklist)
00768     _request->_blklist->createDigest(_dig);     // resets digest
00769   _state = WORKER_FETCH;
00770   
00771   double now = currentTime();
00772   _blkstarttime = now;
00773   _blkreceived = 0;
00774 }
00775 
00776 
00778 
00779 
00780 multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) : _context(context), _filename(filename), _baseurl(baseurl)
00781 {
00782   _fp = fp;
00783   _report = report;
00784   _blklist = blklist;
00785   _filesize = filesize;
00786   _multi = multi;
00787   _stealing = false;
00788   _havenewjob = false;
00789   _blkno = 0;
00790   if (_blklist)
00791     _blkoff = _blklist->getBlock(0).off;
00792   else
00793     _blkoff = 0;
00794   _activeworkers = 0;
00795   _lookupworkers = 0;
00796   _sleepworkers = 0;
00797   _minsleepuntil = 0;
00798   _finished = false;
00799   _fetchedsize = 0;
00800   _fetchedgoodsize = 0;
00801   _totalsize = 0;
00802   _lastperiodstart = _lastprogress = _starttime = currentTime();
00803   _lastperiodfetched = 0;
00804   _periodavg = 0;
00805   _timeout = 0;
00806   _connect_timeout = 0;
00807   _maxspeed = 0;
00808   _maxworkers = 0;
00809   if (blklist)
00810     {
00811       for (size_t blkno = 0; blkno < blklist->numBlocks(); blkno++)
00812         {
00813           MediaBlock blk = blklist->getBlock(blkno);
00814           _totalsize += blk.size;
00815         }
00816     }
00817   else if (filesize != off_t(-1))
00818     _totalsize = filesize;
00819 }
00820 
00821 multifetchrequest::~multifetchrequest()
00822 {
00823   for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00824     {
00825       multifetchworker *worker = *workeriter;
00826       *workeriter = NULL;
00827       delete worker;
00828     }
00829   _workers.clear();
00830 }
00831 
00832 void
00833 multifetchrequest::run(std::vector<Url> &urllist)
00834 {
00835   int workerno = 0;
00836   std::vector<Url>::iterator urliter = urllist.begin();
00837   for (;;)
00838     {
00839       fd_set rset, wset, xset;
00840       int maxfd, nqueue;
00841 
00842       if (_finished)
00843         {
00844           DBG << "finished!" << endl;
00845           break;
00846         }
00847 
00848       if (_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS)
00849         {
00850           // spawn another worker!
00851           multifetchworker *worker = new multifetchworker(workerno++, *this, *urliter);
00852           _workers.push_back(worker);
00853           if (worker->_state != WORKER_BROKEN)
00854             {
00855               _activeworkers++;
00856               if (worker->_state != WORKER_LOOKUP)
00857                 {
00858                   worker->nextjob();
00859                 }
00860               else
00861                 _lookupworkers++;
00862             }
00863           ++urliter;
00864           continue;
00865         }
00866       if (!_activeworkers)
00867         {
00868           WAR << "No more active workers!" << endl;
00869           // show the first worker error we find
00870           for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00871             {
00872               if ((*workeriter)->_state != WORKER_BROKEN)
00873                 continue;
00874               ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError));
00875             }
00876           break;
00877         }
00878 
00879       FD_ZERO(&rset);
00880       FD_ZERO(&wset);
00881       FD_ZERO(&xset);
00882 
00883       curl_multi_fdset(_multi, &rset, &wset, &xset, &maxfd);
00884 
00885       if (_lookupworkers)
00886         for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00887           (*workeriter)->adddnsfd(rset, maxfd);
00888 
00889       timeval tv;
00890       // if we added a new job we have to call multi_perform once
00891       // to make it show up in the fd set. do not sleep in this case.
00892       tv.tv_sec = 0;
00893       tv.tv_usec = _havenewjob ? 0 : 200000;
00894       if (_sleepworkers && !_havenewjob)
00895         {
00896           if (_minsleepuntil == 0)
00897             {
00898               for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00899                 {
00900                   multifetchworker *worker = *workeriter;
00901                   if (worker->_state != WORKER_SLEEP)
00902                     continue;
00903                   if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil)
00904                     _minsleepuntil = worker->_sleepuntil;
00905                 }
00906             }
00907           double sl = _minsleepuntil - currentTime();
00908           if (sl < 0)
00909             {
00910               sl = 0;
00911               _minsleepuntil = 0;
00912             }
00913           if (sl < .2)
00914             tv.tv_usec = sl * 1000000;
00915         }
00916       int r = select(maxfd + 1, &rset, &wset, &xset, &tv);
00917       if (r == -1 && errno != EINTR)
00918         ZYPP_THROW(MediaCurlException(_baseurl, "select() failed", "unknown error"));
00919       if (r != 0 && _lookupworkers)
00920         for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00921           {
00922             multifetchworker *worker = *workeriter;
00923             if (worker->_state != WORKER_LOOKUP)
00924               continue;
00925             (*workeriter)->dnsevent(rset);
00926             if (worker->_state != WORKER_LOOKUP)
00927               _lookupworkers--;
00928           }
00929       _havenewjob = false;
00930 
00931       // run curl
00932       for (;;)
00933         {
00934           CURLMcode mcode;
00935           int tasks;
00936           mcode = curl_multi_perform(_multi, &tasks);
00937           if (mcode == CURLM_CALL_MULTI_PERFORM)
00938             continue;
00939           if (mcode != CURLM_OK)
00940             ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_perform", "unknown error"));
00941           break;
00942         }
00943 
00944       double now = currentTime();
00945 
00946       // update periodavg
00947       if (now > _lastperiodstart + .5)
00948         {
00949           if (!_periodavg)
00950             _periodavg = (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart);
00951           else
00952             _periodavg = (_periodavg + (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart)) / 2;
00953           _lastperiodfetched = _fetchedsize;
00954           _lastperiodstart = now;
00955         }
00956 
00957       // wake up sleepers
00958       if (_sleepworkers)
00959         {
00960           for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00961             {
00962               multifetchworker *worker = *workeriter;
00963               if (worker->_state != WORKER_SLEEP)
00964                 continue;
00965               if (worker->_sleepuntil > now)
00966                 continue;
00967               if (_minsleepuntil == worker->_sleepuntil)
00968                 _minsleepuntil = 0;
00969               DBG << "#" << worker->_workerno << ": sleep done, wake up" << endl;
00970               _sleepworkers--;
00971               // nextjob chnages the state
00972               worker->nextjob();
00973             }
00974         }
00975 
00976       // collect all curl results, reschedule new jobs
00977       CURLMsg *msg;
00978       while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0)
00979         {
00980           if (msg->msg != CURLMSG_DONE)
00981             continue;
00982           CURL *easy = msg->easy_handle;
00983           CURLcode cc = msg->data.result;
00984           multifetchworker *worker;
00985           if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK)
00986             ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error"));
00987           if (worker->_blkreceived && now > worker->_blkstarttime)
00988             {
00989               if (worker->_avgspeed)
00990                 worker->_avgspeed = (worker->_avgspeed + worker->_blkreceived / (now - worker->_blkstarttime)) / 2;
00991               else
00992                 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
00993             }
00994           DBG << "#" << worker->_workerno << ": BLK " << worker->_blkno << " done code " << cc << " speed " << worker->_avgspeed << endl;
00995           curl_multi_remove_handle(_multi, easy);
00996           if (cc == CURLE_HTTP_RETURNED_ERROR)
00997             {
00998               long statuscode = 0;
00999               (void)curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &statuscode);
01000               DBG << "HTTP status " << statuscode << endl;
01001               if (statuscode == 416 && !_blklist)       /* Range error */
01002                 {
01003                   if (_filesize == off_t(-1))
01004                     {
01005                       if (!worker->_noendrange)
01006                         {
01007                           DBG << "#" << worker->_workerno << ": retrying with no end range" << endl;
01008                           worker->_noendrange = true;
01009                           worker->run();
01010                           continue;
01011                         }
01012                       worker->_noendrange = false;
01013                       worker->stealjob();
01014                       continue;
01015                     }
01016                   if (worker->_blkstart >= _filesize)
01017                     {
01018                       worker->nextjob();
01019                       continue;
01020                     }
01021                 }
01022             }
01023           if (cc == 0)
01024             {
01025               if (!worker->checkChecksum())
01026                 {
01027                   WAR << "#" << worker->_workerno << ": checksum error, disable worker" << endl;
01028                   worker->_state = WORKER_BROKEN;
01029                   strncpy(worker->_curlError, "checksum error", CURL_ERROR_SIZE);
01030                   _activeworkers--;
01031                   continue;
01032                 }
01033               if (worker->_state == WORKER_FETCH)
01034                 {
01035                   if (worker->_competing)
01036                     {
01037                       worker->disableCompetition();
01038                       // multiple workers wrote into this block. We already know that our
01039                       // data was correct, but maybe some other worker overwrote our data
01040                       // with something broken. Thus we have to re-check the block.
01041                       if (!worker->recheckChecksum())
01042                         {
01043                           DBG << "#" << worker->_workerno << ": recheck checksum error, refetch block" << endl;
01044                           // re-fetch! No need to worry about the bad workers,
01045                           // they will now be set to DISCARD. At the end of their block
01046                           // they will notice that they wrote bad data and go into BROKEN.
01047                           worker->run();
01048                           continue;
01049                         }
01050                     }
01051                   _fetchedgoodsize += worker->_blksize;
01052                 }
01053 
01054               // make bad workers sleep a little
01055               double maxavg = 0;
01056               int maxworkerno = 0;
01057               int numbetter = 0;
01058               for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
01059                 {
01060                   multifetchworker *oworker = *workeriter;
01061                   if (oworker->_state == WORKER_BROKEN)
01062                     continue;
01063                   if (oworker->_avgspeed > maxavg)
01064                     {
01065                       maxavg = oworker->_avgspeed;
01066                       maxworkerno = oworker->_workerno;
01067                     }
01068                   if (oworker->_avgspeed > worker->_avgspeed)
01069                     numbetter++;
01070                 }
01071               if (maxavg && !_stealing)
01072                 {
01073                   double ratio = worker->_avgspeed / maxavg;
01074                   ratio = 1 - ratio;
01075                   if (numbetter < 3)    // don't sleep that much if we're in the top two
01076                     ratio = ratio * ratio;
01077                   if (ratio > .01)
01078                     {
01079                       DBG << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl;
01080                       worker->_sleepuntil = now + ratio;
01081                       worker->_state = WORKER_SLEEP;
01082                       _sleepworkers++;
01083                       continue;
01084                     }
01085                 }
01086 
01087               // do rate control (if requested)
01088               // should use periodavg, but that's not what libcurl does
01089               if (_maxspeed && now > _starttime)
01090                 {
01091                   double avg = _fetchedsize / (now - _starttime);
01092                   avg = worker->_maxspeed * _maxspeed / avg;
01093                   if (avg < _maxspeed / _maxworkers)
01094                     avg = _maxspeed / _maxworkers;
01095                   if (avg > _maxspeed)
01096                     avg = _maxspeed;
01097                   if (avg < 1024)
01098                     avg = 1024;
01099                   worker->_maxspeed = avg;
01100                   curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg));
01101                 }
01102 
01103               worker->nextjob();
01104             }
01105           else
01106             {
01107               worker->_state = WORKER_BROKEN;
01108               _activeworkers--;
01109               if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS))
01110                 {
01111                   // end of workers reached! goodbye!
01112                   worker->evaluateCurlCode(Pathname(), cc, false);
01113                 }
01114             }
01115         }
01116 
01117       // send report
01118       if (_report)
01119         {
01120           int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0;
01121           double avg = 0;
01122           if (now > _starttime)
01123             avg = _fetchedsize / (now - _starttime);
01124           if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg))
01125             ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled"));
01126         }
01127 
01128       if (_timeout && now - _lastprogress > _timeout)
01129         break;
01130     }
01131 
01132   if (!_finished)
01133     ZYPP_THROW(MediaTimeoutException(_baseurl));
01134 
01135   // print some download stats
01136   WAR << "overall result" << endl;
01137   for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
01138     {
01139       multifetchworker *worker = *workeriter;
01140       WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl;
01141     }
01142 }
01143 
01144 
01146 
01147 
01148 MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r)
01149     : MediaCurl(url_r, attach_point_hint_r)
01150 {
01151   MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl;
01152   _multi = 0;
01153   _customHeadersMetalink = 0;
01154 }
01155 
01156 MediaMultiCurl::~MediaMultiCurl()
01157 {
01158   if (_customHeadersMetalink)
01159     {
01160       curl_slist_free_all(_customHeadersMetalink);
01161       _customHeadersMetalink = 0;
01162     }
01163   if (_multi)
01164     {
01165       curl_multi_cleanup(_multi);
01166       _multi = 0;
01167     }
01168   std::map<std::string, CURL *>::iterator it;
01169   for (it = _easypool.begin(); it != _easypool.end(); it++)
01170     {
01171       CURL *easy = it->second;
01172       if (easy)
01173         {
01174           curl_easy_cleanup(easy);
01175           it->second = NULL;
01176         }
01177     }
01178 }
01179 
01180 void MediaMultiCurl::setupEasy()
01181 {
01182   MediaCurl::setupEasy();
01183 
01184   if (_customHeadersMetalink)
01185     {
01186       curl_slist_free_all(_customHeadersMetalink);
01187       _customHeadersMetalink = 0;
01188     }
01189   struct curl_slist *sl = _customHeaders;
01190   for (; sl; sl = sl->next)
01191     _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data);
01192   _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/metalink+xml, application/metalink4+xml");
01193 }
01194 
01195 static bool looks_like_metalink(const Pathname & file)
01196 {
01197   char buf[256], *p;
01198   int fd, l;
01199   if ((fd = open(file.asString().c_str(), O_RDONLY)) == -1)
01200     return false;
01201   while ((l = read(fd, buf, sizeof(buf) - 1)) == -1 && errno == EINTR)
01202     ;
01203   close(fd);
01204   if (l == -1)
01205     return 0;
01206   buf[l] = 0;
01207   p = buf;
01208   while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
01209     p++;
01210   if (!strncasecmp(p, "<?xml", 5))
01211     {
01212       while (*p && *p != '>')
01213         p++;
01214       if (*p == '>')
01215         p++;
01216       while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
01217         p++;
01218     }
01219   bool ret = !strncasecmp(p, "<metalink", 9) ? true : false;
01220   DBG << "looks_like_metalink(" << file << "): " << ret << endl;
01221   return ret;
01222 }
01223 
01224 void MediaMultiCurl::doGetFileCopy( const Pathname & filename , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const
01225 {
01226   Pathname dest = target.absolutename();
01227   if( assert_dir( dest.dirname() ) )
01228   {
01229     DBG << "assert_dir " << dest.dirname() << " failed" << endl;
01230     Url url(getFileUrl(filename));
01231     ZYPP_THROW( MediaSystemException(url, "System error on " + dest.dirname().asString()) );
01232   }
01233   string destNew = target.asString() + ".new.zypp.XXXXXX";
01234   char *buf = ::strdup( destNew.c_str());
01235   if( !buf)
01236   {
01237     ERR << "out of memory for temp file name" << endl;
01238     Url url(getFileUrl(filename));
01239     ZYPP_THROW(MediaSystemException(url, "out of memory for temp file name"));
01240   }
01241 
01242   int tmp_fd = ::mkstemp( buf );
01243   if( tmp_fd == -1)
01244   {
01245     free( buf);
01246     ERR << "mkstemp failed for file '" << destNew << "'" << endl;
01247     ZYPP_THROW(MediaWriteException(destNew));
01248   }
01249   destNew = buf;
01250   free( buf);
01251 
01252   FILE *file = ::fdopen( tmp_fd, "w" );
01253   if ( !file ) {
01254     ::close( tmp_fd);
01255     filesystem::unlink( destNew );
01256     ERR << "fopen failed for file '" << destNew << "'" << endl;
01257     ZYPP_THROW(MediaWriteException(destNew));
01258   }
01259   DBG << "dest: " << dest << endl;
01260   DBG << "temp: " << destNew << endl;
01261 
01262   // set IFMODSINCE time condition (no download if not modified)
01263   if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) )
01264   {
01265     curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);
01266     curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime());
01267   }
01268   else
01269   {
01270     curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01271     curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01272   }
01273   // change header to include Accept: metalink
01274   curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink);
01275   try
01276     {
01277       MediaCurl::doGetFileCopyFile(filename, dest, file, report, options);
01278     }
01279   catch (Exception &ex)
01280     {
01281       ::fclose(file);
01282       filesystem::unlink(destNew);
01283       curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01284       curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01285       curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
01286       ZYPP_RETHROW(ex);
01287     }
01288   curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01289   curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01290   curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
01291   long httpReturnCode = 0;
01292   CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode);
01293   if (infoRet == CURLE_OK)
01294   {
01295     DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl;
01296     if ( httpReturnCode == 304
01297          || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified
01298     {
01299       DBG << "not modified: " << PathInfo(dest) << endl;
01300       return;
01301     }
01302   }
01303   else
01304   {
01305     WAR << "Could not get the reponse code." << endl;
01306   }
01307 
01308   bool ismetalink = false;
01309 
01310   char *ptr = NULL;
01311   if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
01312     {
01313       string ct = string(ptr);
01314       if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
01315         ismetalink = true;
01316     }
01317 
01318   if (!ismetalink)
01319     {
01320       // some proxies do not store the content type, so also look at the file to find
01321       // out if we received a metalink (bnc#649925)
01322       fflush(file);
01323       if (looks_like_metalink(Pathname(destNew)))
01324         ismetalink = true;
01325     }
01326 
01327   if (ismetalink)
01328     {
01329       bool userabort = false;
01330       fclose(file);
01331       file = NULL;
01332       Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed";
01333       try
01334         {
01335           MetaLinkParser mlp;
01336           mlp.parse(Pathname(destNew));
01337           MediaBlockList bl = mlp.getBlockList();
01338           vector<Url> urls = mlp.getUrls();
01339           DBG << bl << endl;
01340           file = fopen(destNew.c_str(), "w+");
01341           if (!file)
01342             ZYPP_THROW(MediaWriteException(destNew));
01343           if (PathInfo(target).isExist())
01344             {
01345               DBG << "reusing blocks from file " << target << endl;
01346               bl.reuseBlocks(file, target.asString());
01347               DBG << bl << endl;
01348             }
01349           if (bl.haveChecksum(1) && PathInfo(failedFile).isExist())
01350             {
01351               DBG << "reusing blocks from file " << failedFile << endl;
01352               bl.reuseBlocks(file, failedFile.asString());
01353               DBG << bl << endl;
01354               filesystem::unlink(failedFile);
01355             }
01356           Pathname df = deltafile();
01357           if (!df.empty())
01358             {
01359               DBG << "reusing blocks from file " << df << endl;
01360               bl.reuseBlocks(file, df.asString());
01361               DBG << bl << endl;
01362             }
01363           try
01364             {
01365               multifetch(filename, file, &urls, &report, &bl);
01366             }
01367           catch (MediaCurlException &ex)
01368             {
01369               userabort = ex.errstr() == "User abort";
01370               ZYPP_RETHROW(ex);
01371             }
01372         }
01373       catch (Exception &ex)
01374         {
01375           // something went wrong. fall back to normal download
01376           if (file)
01377             fclose(file);
01378           file = NULL;
01379           if (PathInfo(destNew).size() >= 63336)
01380             {
01381 	      ::unlink(failedFile.asString().c_str());
01382               filesystem::hardlinkCopy(destNew, failedFile);
01383             }
01384           if (userabort)
01385             {
01386               filesystem::unlink(destNew);
01387               ZYPP_RETHROW(ex);
01388             }
01389           file = fopen(destNew.c_str(), "w+");
01390           if (!file)
01391             ZYPP_THROW(MediaWriteException(destNew));
01392           MediaCurl::doGetFileCopyFile(filename, dest, file, report, options | OPTION_NO_REPORT_START);
01393         }
01394     }
01395 
01396   if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 )))
01397     {
01398       ERR << "Failed to chmod file " << destNew << endl;
01399     }
01400   if (::fclose(file))
01401     {
01402       filesystem::unlink(destNew);
01403       ERR << "Fclose failed for file '" << destNew << "'" << endl;
01404       ZYPP_THROW(MediaWriteException(destNew));
01405     }
01406   if ( rename( destNew, dest ) != 0 )
01407     {
01408       ERR << "Rename failed" << endl;
01409       ZYPP_THROW(MediaWriteException(dest));
01410     }
01411   DBG << "done: " << PathInfo(dest) << endl;
01412 }
01413 
01414 void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) const
01415 {
01416   Url baseurl(getFileUrl(filename));
01417   if (blklist && filesize == off_t(-1) && blklist->haveFilesize())
01418     filesize = blklist->getFilesize();
01419   if (blklist && !blklist->haveBlocks() && filesize != 0)
01420     blklist = 0;
01421   if (blklist && (filesize == 0 || !blklist->numBlocks()))
01422     {
01423       checkFileDigest(baseurl, fp, blklist);
01424       return;
01425     }
01426   if (filesize == 0)
01427     return;
01428   if (!_multi)
01429     {
01430       _multi = curl_multi_init();
01431       if (!_multi)
01432         ZYPP_THROW(MediaCurlInitException(baseurl));
01433     }
01434   multifetchrequest req(this, filename, baseurl, _multi, fp, report, blklist, filesize);
01435   req._timeout = _settings.timeout();
01436   req._connect_timeout = _settings.connectTimeout();
01437   req._maxspeed = _settings.maxDownloadSpeed();
01438   req._maxworkers = _settings.maxConcurrentConnections();
01439   if (req._maxworkers > MAXURLS)
01440     req._maxworkers = MAXURLS;
01441   if (req._maxworkers <= 0)
01442     req._maxworkers = 1;
01443   std::vector<Url> myurllist;
01444   for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter)
01445     {
01446       try
01447         {
01448           string scheme = urliter->getScheme();
01449           if (scheme == "http" || scheme == "https" || scheme == "ftp")
01450             {
01451               checkProtocol(*urliter);
01452               myurllist.push_back(*urliter);
01453             }
01454         }
01455       catch (...)
01456         {
01457         }
01458     }
01459   if (!myurllist.size())
01460     myurllist.push_back(baseurl);
01461   req.run(myurllist);
01462   checkFileDigest(baseurl, fp, blklist);
01463 }
01464 
01465 void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList *blklist) const
01466 {
01467   if (!blklist || !blklist->haveFileChecksum())
01468     return;
01469   if (fseeko(fp, off_t(0), SEEK_SET))
01470     ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error"));
01471   Digest dig;
01472   blklist->createFileDigest(dig);
01473   char buf[4096];
01474   size_t l;
01475   while ((l = fread(buf, 1, sizeof(buf), fp)) > 0)
01476     dig.update(buf, l);
01477   if (!blklist->verifyFileDigest(dig))
01478     ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error"));
01479 }
01480 
01481 bool MediaMultiCurl::isDNSok(const string &host) const
01482 {
01483   return _dnsok.find(host) == _dnsok.end() ? false : true;
01484 }
01485 
01486 void MediaMultiCurl::setDNSok(const string &host) const
01487 {
01488   _dnsok.insert(host);
01489 }
01490 
01491 CURL *MediaMultiCurl::fromEasyPool(const string &host) const
01492 {
01493   if (_easypool.find(host) == _easypool.end())
01494     return 0;
01495   CURL *ret = _easypool[host];
01496   _easypool.erase(host);
01497   return ret;
01498 }
01499 
01500 void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const
01501 {
01502   CURL *oldeasy = _easypool[host];
01503   _easypool[host] = easy;
01504   if (oldeasy)
01505     curl_easy_cleanup(oldeasy);
01506 }
01507 
01508   } // namespace media
01509 } // namespace zypp
01510