00001
00002
00003
00004
00005
00006
00007
00008
00013 #include <ctype.h>
00014 #include <sys/types.h>
00015 #include <signal.h>
00016 #include <sys/wait.h>
00017 #include <netdb.h>
00018 #include <arpa/inet.h>
00019
00020 #include <vector>
00021 #include <iostream>
00022 #include <algorithm>
00023
00024
00025 #include "zypp/ZConfig.h"
00026 #include "zypp/base/Logger.h"
00027 #include "zypp/media/MediaMultiCurl.h"
00028 #include "zypp/media/MetaLinkParser.h"
00029
00030 using namespace std;
00031 using namespace zypp::base;
00032
00033 #undef CURLVERSION_AT_LEAST
00034 #define CURLVERSION_AT_LEAST(M,N,O) LIBCURL_VERSION_NUM >= ((((M)<<8)+(N))<<8)+(O)
00035
00036 namespace zypp {
00037 namespace media {
00038
00039
00041
00042
00043 class multifetchrequest;
00044
00045
00046
00047
00048 class multifetchworker : MediaCurl {
00049 friend class multifetchrequest;
00050
00051 public:
00052 multifetchworker(int no, multifetchrequest &request, const Url &url);
00053 ~multifetchworker();
00054 void nextjob();
00055 void run();
00056 bool checkChecksum();
00057 bool recheckChecksum();
00058 void disableCompetition();
00059
00060 void checkdns();
00061 void adddnsfd(fd_set &rset, int &maxfd);
00062 void dnsevent(fd_set &rset);
00063
00064 int _workerno;
00065
00066 int _state;
00067 bool _competing;
00068
00069 size_t _blkno;
00070 off_t _blkstart;
00071 size_t _blksize;
00072 bool _noendrange;
00073
00074 double _blkstarttime;
00075 size_t _blkreceived;
00076 off_t _received;
00077
00078 double _avgspeed;
00079 double _maxspeed;
00080
00081 double _sleepuntil;
00082
00083 private:
00084 void stealjob();
00085
00086 size_t writefunction(void *ptr, size_t size);
00087 static size_t _writefunction(void *ptr, size_t size, size_t nmemb, void *stream);
00088
00089 size_t headerfunction(char *ptr, size_t size);
00090 static size_t _headerfunction(void *ptr, size_t size, size_t nmemb, void *stream);
00091
00092 multifetchrequest *_request;
00093 int _pass;
00094 string _urlbuf;
00095 off_t _off;
00096 size_t _size;
00097 Digest _dig;
00098
00099 pid_t _pid;
00100 int _dnspipe;
00101 };
00102
00103 #define WORKER_STARTING 0
00104 #define WORKER_LOOKUP 1
00105 #define WORKER_FETCH 2
00106 #define WORKER_DISCARD 3
00107 #define WORKER_DONE 4
00108 #define WORKER_SLEEP 5
00109 #define WORKER_BROKEN 6
00110
00111
00112
00113 class multifetchrequest {
00114 public:
00115 multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize);
00116 ~multifetchrequest();
00117
00118 void run(std::vector<Url> &urllist);
00119
00120 protected:
00121 friend class multifetchworker;
00122
00123 const MediaMultiCurl *_context;
00124 const Pathname _filename;
00125 Url _baseurl;
00126
00127 FILE *_fp;
00128 callback::SendReport<DownloadProgressReport> *_report;
00129 MediaBlockList *_blklist;
00130 off_t _filesize;
00131
00132 CURLM *_multi;
00133
00134 std::list<multifetchworker *> _workers;
00135 bool _stealing;
00136 bool _havenewjob;
00137
00138 size_t _blkno;
00139 off_t _blkoff;
00140 size_t _activeworkers;
00141 size_t _lookupworkers;
00142 size_t _sleepworkers;
00143 double _minsleepuntil;
00144 bool _finished;
00145 off_t _totalsize;
00146 off_t _fetchedsize;
00147 off_t _fetchedgoodsize;
00148
00149 double _starttime;
00150 double _lastprogress;
00151
00152 double _lastperiodstart;
00153 double _lastperiodfetched;
00154 double _periodavg;
00155
00156 public:
00157 double _timeout;
00158 double _connect_timeout;
00159 double _maxspeed;
00160 int _maxworkers;
00161 };
00162
00163 #define BLKSIZE 131072
00164 #define MAXURLS 10
00165
00166
00168
00169 static double
00170 currentTime()
00171 {
00172 struct timeval tv;
00173 if (gettimeofday(&tv, NULL))
00174 return 0;
00175 return tv.tv_sec + tv.tv_usec / 1000000.;
00176 }
00177
00178 size_t
00179 multifetchworker::writefunction(void *ptr, size_t size)
00180 {
00181 size_t len, cnt;
00182 if (_state == WORKER_BROKEN)
00183 return size ? 0 : 1;
00184
00185 double now = currentTime();
00186
00187 len = size > _size ? _size : size;
00188 if (!len)
00189 {
00190
00191 return size;
00192 }
00193
00194 if (_blkstart && _off == _blkstart)
00195 {
00196
00197
00198 char *effurl;
00199 (void)curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &effurl);
00200 if (effurl && !strncasecmp(effurl, "http", 4))
00201 {
00202 long statuscode = 0;
00203 (void)curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &statuscode);
00204 if (statuscode != 206)
00205 return size ? 0 : 1;
00206 }
00207 }
00208
00209 _blkreceived += len;
00210 _received += len;
00211
00212 _request->_lastprogress = now;
00213
00214 if (_state == WORKER_DISCARD || !_request->_fp)
00215 {
00216
00217
00218 if (_request->_blklist)
00219 _dig.update((const char *)ptr, len);
00220 _off += len;
00221 _size -= len;
00222 return size;
00223 }
00224 if (fseeko(_request->_fp, _off, SEEK_SET))
00225 return size ? 0 : 1;
00226 cnt = fwrite(ptr, 1, len, _request->_fp);
00227 if (cnt > 0)
00228 {
00229 _request->_fetchedsize += cnt;
00230 if (_request->_blklist)
00231 _dig.update((const char *)ptr, cnt);
00232 _off += cnt;
00233 _size -= cnt;
00234 if (cnt == len)
00235 return size;
00236 }
00237 return cnt;
00238 }
00239
00240 size_t
00241 multifetchworker::_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
00242 {
00243 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
00244 return me->writefunction(ptr, size * nmemb);
00245 }
00246
00247 size_t
00248 multifetchworker::headerfunction(char *p, size_t size)
00249 {
00250 size_t l = size;
00251 if (l > 9 && !strncasecmp(p, "Location:", 9))
00252 {
00253 string line(p + 9, l - 9);
00254 if (line[l - 10] == '\r')
00255 line.erase(l - 10, 1);
00256 DBG << "#" << _workerno << ": redirecting to" << line << endl;
00257 return size;
00258 }
00259 if (l <= 14 || l >= 128 || strncasecmp(p, "Content-Range:", 14) != 0)
00260 return size;
00261 p += 14;
00262 l -= 14;
00263 while (l && (*p == ' ' || *p == '\t'))
00264 p++, l--;
00265 if (l < 6 || strncasecmp(p, "bytes", 5))
00266 return size;
00267 p += 5;
00268 l -= 5;
00269 char buf[128];
00270 memcpy(buf, p, l);
00271 buf[l] = 0;
00272 unsigned long long start, off, filesize;
00273 if (sscanf(buf, "%llu-%llu/%llu", &start, &off, &filesize) != 3)
00274 return size;
00275 if (_request->_filesize == (off_t)-1)
00276 {
00277 WAR << "#" << _workerno << ": setting request filesize to " << filesize << endl;
00278 _request->_filesize = filesize;
00279 if (_request->_totalsize == 0 && !_request->_blklist)
00280 _request->_totalsize = filesize;
00281 }
00282 if (_request->_filesize != (off_t)filesize)
00283 {
00284 DBG << "#" << _workerno << ": filesize mismatch" << endl;
00285 _state = WORKER_BROKEN;
00286 strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE);
00287 }
00288 return size;
00289 }
00290
00291 size_t
00292 multifetchworker::_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
00293 {
00294 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
00295 return me->headerfunction((char *)ptr, size * nmemb);
00296 }
00297
00298 multifetchworker::multifetchworker(int no, multifetchrequest &request, const Url &url)
00299 : MediaCurl(url, Pathname())
00300 {
00301 _workerno = no;
00302 _request = &request;
00303 _state = WORKER_STARTING;
00304 _competing = false;
00305 _off = _blkstart = 0;
00306 _size = _blksize = 0;
00307 _pass = 0;
00308 _blkno = 0;
00309 _pid = 0;
00310 _dnspipe = -1;
00311 _blkreceived = 0;
00312 _received = 0;
00313 _blkstarttime = 0;
00314 _avgspeed = 0;
00315 _sleepuntil = 0;
00316 _maxspeed = _request->_maxspeed;
00317 _noendrange = false;
00318
00319 Url curlUrl( clearQueryString(url) );
00320 _urlbuf = curlUrl.asString();
00321 _curl = _request->_context->fromEasyPool(_url.getHost());
00322 if (_curl)
00323 DBG << "reused worker from pool" << endl;
00324 if (!_curl && !(_curl = curl_easy_init()))
00325 {
00326 _state = WORKER_BROKEN;
00327 strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE);
00328 return;
00329 }
00330 try
00331 {
00332 setupEasy();
00333 }
00334 catch (Exception &ex)
00335 {
00336 curl_easy_cleanup(_curl);
00337 _curl = 0;
00338 _state = WORKER_BROKEN;
00339 strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE);
00340 return;
00341 }
00342 curl_easy_setopt(_curl, CURLOPT_PRIVATE, this);
00343 curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str());
00344 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, &_writefunction);
00345 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this);
00346 if (_request->_filesize == off_t(-1) || !_request->_blklist || !_request->_blklist->haveChecksum(0))
00347 {
00348 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, &_headerfunction);
00349 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, this);
00350 }
00351
00352
00353
00354 if (url.getHost() == _request->_context->_url.getHost())
00355 {
00356 _settings.setUsername(_request->_context->_settings.username());
00357 _settings.setPassword(_request->_context->_settings.password());
00358 _settings.setAuthType(_request->_context->_settings.authType());
00359 if ( _settings.userPassword().size() )
00360 {
00361 curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str());
00362 string use_auth = _settings.authType();
00363 if (use_auth.empty())
00364 use_auth = "digest,basic";
00365 long auth = CurlAuthData::auth_type_str2long(use_auth);
00366 if( auth != CURLAUTH_NONE)
00367 {
00368 DBG << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth
00369 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl;
00370 curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth);
00371 }
00372 }
00373 }
00374 checkdns();
00375 }
00376
00377 multifetchworker::~multifetchworker()
00378 {
00379 if (_curl)
00380 {
00381 if (_state == WORKER_FETCH || _state == WORKER_DISCARD)
00382 curl_multi_remove_handle(_request->_multi, _curl);
00383 if (_state == WORKER_DONE || _state == WORKER_SLEEP)
00384 {
00385 #if CURLVERSION_AT_LEAST(7,15,5)
00386 curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0);
00387 #endif
00388 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
00389 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0);
00390 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0);
00391 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0);
00392 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0);
00393 _request->_context->toEasyPool(_url.getHost(), _curl);
00394 }
00395 else
00396 curl_easy_cleanup(_curl);
00397 _curl = 0;
00398 }
00399 if (_pid)
00400 {
00401 kill(_pid, SIGKILL);
00402 int status;
00403 while (waitpid(_pid, &status, 0) == -1)
00404 if (errno != EINTR)
00405 break;
00406 _pid = 0;
00407 }
00408 if (_dnspipe != -1)
00409 {
00410 close(_dnspipe);
00411 _dnspipe = -1;
00412 }
00413
00414
00415 disconnectFrom();
00416 }
00417
00418 static inline bool env_isset(string name)
00419 {
00420 const char *s = getenv(name.c_str());
00421 return s && *s ? true : false;
00422 }
00423
00424 void
00425 multifetchworker::checkdns()
00426 {
00427 string host = _url.getHost();
00428
00429 if (host.empty())
00430 return;
00431
00432 if (_request->_context->isDNSok(host))
00433 return;
00434
00435
00436 char addrbuf[128];
00437 if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1)
00438 return;
00439 if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1)
00440 return;
00441
00442
00443 if (!_settings.proxy().empty())
00444 return;
00445 if (env_isset("all_proxy") || env_isset("ALL_PROXY"))
00446 return;
00447 string schemeproxy = _url.getScheme() + "_proxy";
00448 if (env_isset(schemeproxy))
00449 return;
00450 if (schemeproxy != "http_proxy")
00451 {
00452 std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper);
00453 if (env_isset(schemeproxy))
00454 return;
00455 }
00456
00457 DBG << "checking DNS lookup of " << host << endl;
00458 int pipefds[2];
00459 if (pipe(pipefds))
00460 {
00461 _state = WORKER_BROKEN;
00462 strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE);
00463 return;
00464 }
00465 _pid = fork();
00466 if (_pid == pid_t(-1))
00467 {
00468 close(pipefds[0]);
00469 close(pipefds[1]);
00470 _pid = 0;
00471 _state = WORKER_BROKEN;
00472 strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE);
00473 return;
00474 }
00475 else if (_pid == 0)
00476 {
00477 close(pipefds[0]);
00478
00479 struct addrinfo *ai, aihints;
00480 memset(&aihints, 0, sizeof(aihints));
00481 aihints.ai_family = PF_UNSPEC;
00482 int tstsock = socket(PF_INET6, SOCK_DGRAM, 0);
00483 if (tstsock == -1)
00484 aihints.ai_family = PF_INET;
00485 else
00486 close(tstsock);
00487 aihints.ai_socktype = SOCK_STREAM;
00488 aihints.ai_flags = AI_CANONNAME;
00489 unsigned int connecttimeout = _request->_connect_timeout;
00490 if (connecttimeout)
00491 alarm(connecttimeout);
00492 signal(SIGALRM, SIG_DFL);
00493 if (getaddrinfo(host.c_str(), NULL, &aihints, &ai))
00494 _exit(1);
00495 _exit(0);
00496 }
00497 close(pipefds[1]);
00498 _dnspipe = pipefds[0];
00499 _state = WORKER_LOOKUP;
00500 }
00501
00502 void
00503 multifetchworker::adddnsfd(fd_set &rset, int &maxfd)
00504 {
00505 if (_state != WORKER_LOOKUP)
00506 return;
00507 FD_SET(_dnspipe, &rset);
00508 if (maxfd < _dnspipe)
00509 maxfd = _dnspipe;
00510 }
00511
00512 void
00513 multifetchworker::dnsevent(fd_set &rset)
00514 {
00515
00516 if (_state != WORKER_LOOKUP || !FD_ISSET(_dnspipe, &rset))
00517 return;
00518 int status;
00519 while (waitpid(_pid, &status, 0) == -1)
00520 {
00521 if (errno != EINTR)
00522 return;
00523 }
00524 _pid = 0;
00525 if (_dnspipe != -1)
00526 {
00527 close(_dnspipe);
00528 _dnspipe = -1;
00529 }
00530 if (!WIFEXITED(status))
00531 {
00532 _state = WORKER_BROKEN;
00533 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
00534 _request->_activeworkers--;
00535 return;
00536 }
00537 int exitcode = WEXITSTATUS(status);
00538 DBG << "#" << _workerno << ": DNS lookup returned " << exitcode << endl;
00539 if (exitcode != 0)
00540 {
00541 _state = WORKER_BROKEN;
00542 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
00543 _request->_activeworkers--;
00544 return;
00545 }
00546 _request->_context->setDNSok(_url.getHost());
00547 nextjob();
00548 }
00549
00550 bool
00551 multifetchworker::checkChecksum()
00552 {
00553
00554 if (!_blksize || !_request->_blklist)
00555 return true;
00556 return _request->_blklist->verifyDigest(_blkno, _dig);
00557 }
00558
00559 bool
00560 multifetchworker::recheckChecksum()
00561 {
00562
00563 if (!_request->_fp || !_blksize || !_request->_blklist)
00564 return true;
00565 if (fseeko(_request->_fp, _blkstart, SEEK_SET))
00566 return false;
00567 char buf[4096];
00568 size_t l = _blksize;
00569 _request->_blklist->createDigest(_dig);
00570 while (l)
00571 {
00572 size_t cnt = l > sizeof(buf) ? sizeof(buf) : l;
00573 if (fread(buf, cnt, 1, _request->_fp) != 1)
00574 return false;
00575 _dig.update(buf, cnt);
00576 l -= cnt;
00577 }
00578 return _request->_blklist->verifyDigest(_blkno, _dig);
00579 }
00580
00581
00582 void
00583 multifetchworker::stealjob()
00584 {
00585 if (!_request->_stealing)
00586 {
00587 DBG << "start stealing!" << endl;
00588 _request->_stealing = true;
00589 }
00590 multifetchworker *best = 0;
00591 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
00592 double now = 0;
00593 for (; workeriter != _request->_workers.end(); ++workeriter)
00594 {
00595 multifetchworker *worker = *workeriter;
00596 if (worker == this)
00597 continue;
00598 if (worker->_pass == -1)
00599 continue;
00600 if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_blksize)
00601 continue;
00602 if (!worker->_avgspeed && worker->_blkreceived)
00603 {
00604 if (!now)
00605 now = currentTime();
00606 if (now > worker->_blkstarttime)
00607 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
00608 }
00609 if (!best || best->_pass > worker->_pass)
00610 {
00611 best = worker;
00612 continue;
00613 }
00614 if (best->_pass < worker->_pass)
00615 continue;
00616
00617 if (worker->_blkstart == best->_blkstart)
00618 {
00619 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed < (best->_blksize - best->_blkreceived) * worker->_avgspeed)
00620 best = worker;
00621 }
00622 else
00623 {
00624 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed > (best->_blksize - best->_blkreceived) * worker->_avgspeed)
00625 best = worker;
00626 }
00627 }
00628 if (!best)
00629 {
00630 _state = WORKER_DONE;
00631 _request->_activeworkers--;
00632 _request->_finished = true;
00633 return;
00634 }
00635
00636 if (_state != WORKER_SLEEP)
00637 {
00638 if (!_avgspeed && _blkreceived)
00639 {
00640 if (!now)
00641 now = currentTime();
00642 if (now > _blkstarttime)
00643 _avgspeed = _blkreceived / (now - _blkstarttime);
00644 }
00645
00646
00647 DBG << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_blksize << endl;
00648 DBG << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_blksize - best->_blkreceived) << endl;
00649 if (_avgspeed && best->_avgspeed && best->_blksize - best->_blkreceived > 0 &&
00650 (best->_blksize - best->_blkreceived) * _avgspeed < best->_blksize * best->_avgspeed)
00651 {
00652 if (!now)
00653 now = currentTime();
00654 double sl = (best->_blksize - best->_blkreceived) / best->_avgspeed * 2;
00655 if (sl > 1)
00656 sl = 1;
00657 DBG << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl;
00658 _sleepuntil = now + sl;
00659 _state = WORKER_SLEEP;
00660 _request->_sleepworkers++;
00661 return;
00662 }
00663 }
00664
00665 _competing = true;
00666 best->_competing = true;
00667 _blkstart = best->_blkstart;
00668 _blksize = best->_blksize;
00669 best->_pass++;
00670 _pass = best->_pass;
00671 _blkno = best->_blkno;
00672 run();
00673 }
00674
00675 void
00676 multifetchworker::disableCompetition()
00677 {
00678 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
00679 for (; workeriter != _request->_workers.end(); ++workeriter)
00680 {
00681 multifetchworker *worker = *workeriter;
00682 if (worker == this)
00683 continue;
00684 if (worker->_blkstart == _blkstart)
00685 {
00686 if (worker->_state == WORKER_FETCH)
00687 worker->_state = WORKER_DISCARD;
00688 worker->_pass = -1;
00689 }
00690 }
00691 }
00692
00693
00694 void
00695 multifetchworker::nextjob()
00696 {
00697 _noendrange = false;
00698 if (_request->_stealing)
00699 {
00700 stealjob();
00701 return;
00702 }
00703
00704 MediaBlockList *blklist = _request->_blklist;
00705 if (!blklist)
00706 {
00707 _blksize = BLKSIZE;
00708 if (_request->_filesize != off_t(-1))
00709 {
00710 if (_request->_blkoff >= _request->_filesize)
00711 {
00712 stealjob();
00713 return;
00714 }
00715 _blksize = _request->_filesize - _request->_blkoff;
00716 if (_blksize > BLKSIZE)
00717 _blksize = BLKSIZE;
00718 }
00719 }
00720 else
00721 {
00722 MediaBlock blk = blklist->getBlock(_request->_blkno);
00723 while (_request->_blkoff >= (off_t)(blk.off + blk.size))
00724 {
00725 if (++_request->_blkno == blklist->numBlocks())
00726 {
00727 stealjob();
00728 return;
00729 }
00730 blk = blklist->getBlock(_request->_blkno);
00731 _request->_blkoff = blk.off;
00732 }
00733 _blksize = blk.off + blk.size - _request->_blkoff;
00734 if (_blksize > BLKSIZE && !blklist->haveChecksum(_request->_blkno))
00735 _blksize = BLKSIZE;
00736 }
00737 _blkno = _request->_blkno;
00738 _blkstart = _request->_blkoff;
00739 _request->_blkoff += _blksize;
00740 run();
00741 }
00742
00743 void
00744 multifetchworker::run()
00745 {
00746 char rangebuf[128];
00747
00748 if (_state == WORKER_BROKEN || _state == WORKER_DONE)
00749 return;
00750 if (_noendrange)
00751 sprintf(rangebuf, "%llu-", (unsigned long long)_blkstart);
00752 else
00753 sprintf(rangebuf, "%llu-%llu", (unsigned long long)_blkstart, (unsigned long long)_blkstart + _blksize - 1);
00754 DBG << "#" << _workerno << ": BLK " << _blkno << ":" << rangebuf << " " << _url << endl;
00755 if (curl_easy_setopt(_curl, CURLOPT_RANGE, !_noendrange || _blkstart != 0 ? rangebuf : (char *)0) != CURLE_OK)
00756 {
00757 _request->_activeworkers--;
00758 _state = WORKER_BROKEN;
00759 strncpy(_curlError, "curl_easy_setopt range failed", CURL_ERROR_SIZE);
00760 return;
00761 }
00762 if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK)
00763 {
00764 _request->_activeworkers--;
00765 _state = WORKER_BROKEN;
00766 strncpy(_curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE);
00767 return;
00768 }
00769 _request->_havenewjob = true;
00770 _off = _blkstart;
00771 _size = _blksize;
00772 if (_request->_blklist)
00773 _request->_blklist->createDigest(_dig);
00774 _state = WORKER_FETCH;
00775
00776 double now = currentTime();
00777 _blkstarttime = now;
00778 _blkreceived = 0;
00779 }
00780
00781
00783
00784
00785 multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) : _context(context), _filename(filename), _baseurl(baseurl)
00786 {
00787 _fp = fp;
00788 _report = report;
00789 _blklist = blklist;
00790 _filesize = filesize;
00791 _multi = multi;
00792 _stealing = false;
00793 _havenewjob = false;
00794 _blkno = 0;
00795 if (_blklist)
00796 _blkoff = _blklist->getBlock(0).off;
00797 else
00798 _blkoff = 0;
00799 _activeworkers = 0;
00800 _lookupworkers = 0;
00801 _sleepworkers = 0;
00802 _minsleepuntil = 0;
00803 _finished = false;
00804 _fetchedsize = 0;
00805 _fetchedgoodsize = 0;
00806 _totalsize = 0;
00807 _lastperiodstart = _lastprogress = _starttime = currentTime();
00808 _lastperiodfetched = 0;
00809 _periodavg = 0;
00810 _timeout = 0;
00811 _connect_timeout = 0;
00812 _maxspeed = 0;
00813 _maxworkers = 0;
00814 if (blklist)
00815 {
00816 for (size_t blkno = 0; blkno < blklist->numBlocks(); blkno++)
00817 {
00818 MediaBlock blk = blklist->getBlock(blkno);
00819 _totalsize += blk.size;
00820 }
00821 }
00822 else if (filesize != off_t(-1))
00823 _totalsize = filesize;
00824 }
00825
00826 multifetchrequest::~multifetchrequest()
00827 {
00828 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00829 {
00830 multifetchworker *worker = *workeriter;
00831 *workeriter = NULL;
00832 delete worker;
00833 }
00834 _workers.clear();
00835 }
00836
00837 void
00838 multifetchrequest::run(std::vector<Url> &urllist)
00839 {
00840 int workerno = 0;
00841 std::vector<Url>::iterator urliter = urllist.begin();
00842 for (;;)
00843 {
00844 fd_set rset, wset, xset;
00845 int maxfd, nqueue;
00846
00847 if (_finished)
00848 {
00849 DBG << "finished!" << endl;
00850 break;
00851 }
00852
00853 if ((int)_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS)
00854 {
00855
00856 multifetchworker *worker = new multifetchworker(workerno++, *this, *urliter);
00857 _workers.push_back(worker);
00858 if (worker->_state != WORKER_BROKEN)
00859 {
00860 _activeworkers++;
00861 if (worker->_state != WORKER_LOOKUP)
00862 {
00863 worker->nextjob();
00864 }
00865 else
00866 _lookupworkers++;
00867 }
00868 ++urliter;
00869 continue;
00870 }
00871 if (!_activeworkers)
00872 {
00873 WAR << "No more active workers!" << endl;
00874
00875 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00876 {
00877 if ((*workeriter)->_state != WORKER_BROKEN)
00878 continue;
00879 ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError));
00880 }
00881 break;
00882 }
00883
00884 FD_ZERO(&rset);
00885 FD_ZERO(&wset);
00886 FD_ZERO(&xset);
00887
00888 curl_multi_fdset(_multi, &rset, &wset, &xset, &maxfd);
00889
00890 if (_lookupworkers)
00891 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00892 (*workeriter)->adddnsfd(rset, maxfd);
00893
00894 timeval tv;
00895
00896
00897 tv.tv_sec = 0;
00898 tv.tv_usec = _havenewjob ? 0 : 200000;
00899 if (_sleepworkers && !_havenewjob)
00900 {
00901 if (_minsleepuntil == 0)
00902 {
00903 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00904 {
00905 multifetchworker *worker = *workeriter;
00906 if (worker->_state != WORKER_SLEEP)
00907 continue;
00908 if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil)
00909 _minsleepuntil = worker->_sleepuntil;
00910 }
00911 }
00912 double sl = _minsleepuntil - currentTime();
00913 if (sl < 0)
00914 {
00915 sl = 0;
00916 _minsleepuntil = 0;
00917 }
00918 if (sl < .2)
00919 tv.tv_usec = sl * 1000000;
00920 }
00921 int r = select(maxfd + 1, &rset, &wset, &xset, &tv);
00922 if (r == -1 && errno != EINTR)
00923 ZYPP_THROW(MediaCurlException(_baseurl, "select() failed", "unknown error"));
00924 if (r != 0 && _lookupworkers)
00925 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00926 {
00927 multifetchworker *worker = *workeriter;
00928 if (worker->_state != WORKER_LOOKUP)
00929 continue;
00930 (*workeriter)->dnsevent(rset);
00931 if (worker->_state != WORKER_LOOKUP)
00932 _lookupworkers--;
00933 }
00934 _havenewjob = false;
00935
00936
00937 for (;;)
00938 {
00939 CURLMcode mcode;
00940 int tasks;
00941 mcode = curl_multi_perform(_multi, &tasks);
00942 if (mcode == CURLM_CALL_MULTI_PERFORM)
00943 continue;
00944 if (mcode != CURLM_OK)
00945 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_perform", "unknown error"));
00946 break;
00947 }
00948
00949 double now = currentTime();
00950
00951
00952 if (now > _lastperiodstart + .5)
00953 {
00954 if (!_periodavg)
00955 _periodavg = (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart);
00956 else
00957 _periodavg = (_periodavg + (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart)) / 2;
00958 _lastperiodfetched = _fetchedsize;
00959 _lastperiodstart = now;
00960 }
00961
00962
00963 if (_sleepworkers)
00964 {
00965 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
00966 {
00967 multifetchworker *worker = *workeriter;
00968 if (worker->_state != WORKER_SLEEP)
00969 continue;
00970 if (worker->_sleepuntil > now)
00971 continue;
00972 if (_minsleepuntil == worker->_sleepuntil)
00973 _minsleepuntil = 0;
00974 DBG << "#" << worker->_workerno << ": sleep done, wake up" << endl;
00975 _sleepworkers--;
00976
00977 worker->nextjob();
00978 }
00979 }
00980
00981
00982 CURLMsg *msg;
00983 while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0)
00984 {
00985 if (msg->msg != CURLMSG_DONE)
00986 continue;
00987 CURL *easy = msg->easy_handle;
00988 CURLcode cc = msg->data.result;
00989 multifetchworker *worker;
00990 if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK)
00991 ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error"));
00992 if (worker->_blkreceived && now > worker->_blkstarttime)
00993 {
00994 if (worker->_avgspeed)
00995 worker->_avgspeed = (worker->_avgspeed + worker->_blkreceived / (now - worker->_blkstarttime)) / 2;
00996 else
00997 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
00998 }
00999 DBG << "#" << worker->_workerno << ": BLK " << worker->_blkno << " done code " << cc << " speed " << worker->_avgspeed << endl;
01000 curl_multi_remove_handle(_multi, easy);
01001 if (cc == CURLE_HTTP_RETURNED_ERROR)
01002 {
01003 long statuscode = 0;
01004 (void)curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &statuscode);
01005 DBG << "HTTP status " << statuscode << endl;
01006 if (statuscode == 416 && !_blklist)
01007 {
01008 if (_filesize == off_t(-1))
01009 {
01010 if (!worker->_noendrange)
01011 {
01012 DBG << "#" << worker->_workerno << ": retrying with no end range" << endl;
01013 worker->_noendrange = true;
01014 worker->run();
01015 continue;
01016 }
01017 worker->_noendrange = false;
01018 worker->stealjob();
01019 continue;
01020 }
01021 if (worker->_blkstart >= _filesize)
01022 {
01023 worker->nextjob();
01024 continue;
01025 }
01026 }
01027 }
01028 if (cc == 0)
01029 {
01030 if (!worker->checkChecksum())
01031 {
01032 WAR << "#" << worker->_workerno << ": checksum error, disable worker" << endl;
01033 worker->_state = WORKER_BROKEN;
01034 strncpy(worker->_curlError, "checksum error", CURL_ERROR_SIZE);
01035 _activeworkers--;
01036 continue;
01037 }
01038 if (worker->_state == WORKER_FETCH)
01039 {
01040 if (worker->_competing)
01041 {
01042 worker->disableCompetition();
01043
01044
01045
01046 if (!worker->recheckChecksum())
01047 {
01048 DBG << "#" << worker->_workerno << ": recheck checksum error, refetch block" << endl;
01049
01050
01051
01052 worker->run();
01053 continue;
01054 }
01055 }
01056 _fetchedgoodsize += worker->_blksize;
01057 }
01058
01059
01060 double maxavg = 0;
01061 int maxworkerno = 0;
01062 int numbetter = 0;
01063 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
01064 {
01065 multifetchworker *oworker = *workeriter;
01066 if (oworker->_state == WORKER_BROKEN)
01067 continue;
01068 if (oworker->_avgspeed > maxavg)
01069 {
01070 maxavg = oworker->_avgspeed;
01071 maxworkerno = oworker->_workerno;
01072 }
01073 if (oworker->_avgspeed > worker->_avgspeed)
01074 numbetter++;
01075 }
01076 if (maxavg && !_stealing)
01077 {
01078 double ratio = worker->_avgspeed / maxavg;
01079 ratio = 1 - ratio;
01080 if (numbetter < 3)
01081 ratio = ratio * ratio;
01082 if (ratio > .01)
01083 {
01084 DBG << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl;
01085 worker->_sleepuntil = now + ratio;
01086 worker->_state = WORKER_SLEEP;
01087 _sleepworkers++;
01088 continue;
01089 }
01090 }
01091
01092
01093
01094 if (_maxspeed && now > _starttime)
01095 {
01096 double avg = _fetchedsize / (now - _starttime);
01097 avg = worker->_maxspeed * _maxspeed / avg;
01098 if (avg < _maxspeed / _maxworkers)
01099 avg = _maxspeed / _maxworkers;
01100 if (avg > _maxspeed)
01101 avg = _maxspeed;
01102 if (avg < 1024)
01103 avg = 1024;
01104 worker->_maxspeed = avg;
01105 #if CURLVERSION_AT_LEAST(7,15,5)
01106 curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg));
01107 #endif
01108 }
01109
01110 worker->nextjob();
01111 }
01112 else
01113 {
01114 worker->_state = WORKER_BROKEN;
01115 _activeworkers--;
01116 if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS))
01117 {
01118
01119 worker->evaluateCurlCode(Pathname(), cc, false);
01120 }
01121 }
01122 }
01123
01124
01125 if (_report)
01126 {
01127 int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0;
01128 double avg = 0;
01129 if (now > _starttime)
01130 avg = _fetchedsize / (now - _starttime);
01131 if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg))
01132 ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled"));
01133 }
01134
01135 if (_timeout && now - _lastprogress > _timeout)
01136 break;
01137 }
01138
01139 if (!_finished)
01140 ZYPP_THROW(MediaTimeoutException(_baseurl));
01141
01142
01143 WAR << "overall result" << endl;
01144 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
01145 {
01146 multifetchworker *worker = *workeriter;
01147 WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl;
01148 }
01149 }
01150
01151
01153
01154
01155 MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r)
01156 : MediaCurl(url_r, attach_point_hint_r)
01157 {
01158 MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl;
01159 _multi = 0;
01160 _customHeadersMetalink = 0;
01161 }
01162
01163 MediaMultiCurl::~MediaMultiCurl()
01164 {
01165 if (_customHeadersMetalink)
01166 {
01167 curl_slist_free_all(_customHeadersMetalink);
01168 _customHeadersMetalink = 0;
01169 }
01170 if (_multi)
01171 {
01172 curl_multi_cleanup(_multi);
01173 _multi = 0;
01174 }
01175 std::map<std::string, CURL *>::iterator it;
01176 for (it = _easypool.begin(); it != _easypool.end(); it++)
01177 {
01178 CURL *easy = it->second;
01179 if (easy)
01180 {
01181 curl_easy_cleanup(easy);
01182 it->second = NULL;
01183 }
01184 }
01185 }
01186
01187 void MediaMultiCurl::setupEasy()
01188 {
01189 MediaCurl::setupEasy();
01190
01191 if (_customHeadersMetalink)
01192 {
01193 curl_slist_free_all(_customHeadersMetalink);
01194 _customHeadersMetalink = 0;
01195 }
01196 struct curl_slist *sl = _customHeaders;
01197 for (; sl; sl = sl->next)
01198 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data);
01199 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/metalink+xml, application/metalink4+xml");
01200 }
01201
01202 static bool looks_like_metalink(const Pathname & file)
01203 {
01204 char buf[256], *p;
01205 int fd, l;
01206 if ((fd = open(file.asString().c_str(), O_RDONLY)) == -1)
01207 return false;
01208 while ((l = read(fd, buf, sizeof(buf) - 1)) == -1 && errno == EINTR)
01209 ;
01210 close(fd);
01211 if (l == -1)
01212 return 0;
01213 buf[l] = 0;
01214 p = buf;
01215 while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
01216 p++;
01217 if (!strncasecmp(p, "<?xml", 5))
01218 {
01219 while (*p && *p != '>')
01220 p++;
01221 if (*p == '>')
01222 p++;
01223 while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
01224 p++;
01225 }
01226 bool ret = !strncasecmp(p, "<metalink", 9) ? true : false;
01227 DBG << "looks_like_metalink(" << file << "): " << ret << endl;
01228 return ret;
01229 }
01230
01231 void MediaMultiCurl::doGetFileCopy( const Pathname & filename , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const
01232 {
01233 Pathname dest = target.absolutename();
01234 if( assert_dir( dest.dirname() ) )
01235 {
01236 DBG << "assert_dir " << dest.dirname() << " failed" << endl;
01237 Url url(getFileUrl(filename));
01238 ZYPP_THROW( MediaSystemException(url, "System error on " + dest.dirname().asString()) );
01239 }
01240 string destNew = target.asString() + ".new.zypp.XXXXXX";
01241 char *buf = ::strdup( destNew.c_str());
01242 if( !buf)
01243 {
01244 ERR << "out of memory for temp file name" << endl;
01245 Url url(getFileUrl(filename));
01246 ZYPP_THROW(MediaSystemException(url, "out of memory for temp file name"));
01247 }
01248
01249 int tmp_fd = ::mkstemp( buf );
01250 if( tmp_fd == -1)
01251 {
01252 free( buf);
01253 ERR << "mkstemp failed for file '" << destNew << "'" << endl;
01254 ZYPP_THROW(MediaWriteException(destNew));
01255 }
01256 destNew = buf;
01257 free( buf);
01258
01259 FILE *file = ::fdopen( tmp_fd, "w" );
01260 if ( !file ) {
01261 ::close( tmp_fd);
01262 filesystem::unlink( destNew );
01263 ERR << "fopen failed for file '" << destNew << "'" << endl;
01264 ZYPP_THROW(MediaWriteException(destNew));
01265 }
01266 DBG << "dest: " << dest << endl;
01267 DBG << "temp: " << destNew << endl;
01268
01269
01270 if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) )
01271 {
01272 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);
01273 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime());
01274 }
01275 else
01276 {
01277 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01278 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01279 }
01280
01281 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink);
01282 try
01283 {
01284 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options);
01285 }
01286 catch (Exception &ex)
01287 {
01288 ::fclose(file);
01289 filesystem::unlink(destNew);
01290 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01291 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01292 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
01293 ZYPP_RETHROW(ex);
01294 }
01295 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
01296 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
01297 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
01298 long httpReturnCode = 0;
01299 CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode);
01300 if (infoRet == CURLE_OK)
01301 {
01302 DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl;
01303 if ( httpReturnCode == 304
01304 || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) )
01305 {
01306 DBG << "not modified: " << PathInfo(dest) << endl;
01307 return;
01308 }
01309 }
01310 else
01311 {
01312 WAR << "Could not get the reponse code." << endl;
01313 }
01314
01315 bool ismetalink = false;
01316
01317 char *ptr = NULL;
01318 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
01319 {
01320 string ct = string(ptr);
01321 if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
01322 ismetalink = true;
01323 }
01324
01325 if (!ismetalink)
01326 {
01327
01328
01329 fflush(file);
01330 if (looks_like_metalink(Pathname(destNew)))
01331 ismetalink = true;
01332 }
01333
01334 if (ismetalink)
01335 {
01336 bool userabort = false;
01337 fclose(file);
01338 file = NULL;
01339 Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed";
01340 try
01341 {
01342 MetaLinkParser mlp;
01343 mlp.parse(Pathname(destNew));
01344 MediaBlockList bl = mlp.getBlockList();
01345 vector<Url> urls = mlp.getUrls();
01346 DBG << bl << endl;
01347 file = fopen(destNew.c_str(), "w+");
01348 if (!file)
01349 ZYPP_THROW(MediaWriteException(destNew));
01350 if (PathInfo(target).isExist())
01351 {
01352 DBG << "reusing blocks from file " << target << endl;
01353 bl.reuseBlocks(file, target.asString());
01354 DBG << bl << endl;
01355 }
01356 if (bl.haveChecksum(1) && PathInfo(failedFile).isExist())
01357 {
01358 DBG << "reusing blocks from file " << failedFile << endl;
01359 bl.reuseBlocks(file, failedFile.asString());
01360 DBG << bl << endl;
01361 filesystem::unlink(failedFile);
01362 }
01363 Pathname df = deltafile();
01364 if (!df.empty())
01365 {
01366 DBG << "reusing blocks from file " << df << endl;
01367 bl.reuseBlocks(file, df.asString());
01368 DBG << bl << endl;
01369 }
01370 try
01371 {
01372 multifetch(filename, file, &urls, &report, &bl);
01373 }
01374 catch (MediaCurlException &ex)
01375 {
01376 userabort = ex.errstr() == "User abort";
01377 ZYPP_RETHROW(ex);
01378 }
01379 }
01380 catch (Exception &ex)
01381 {
01382
01383 if (file)
01384 fclose(file);
01385 file = NULL;
01386 if (PathInfo(destNew).size() >= 63336)
01387 {
01388 ::unlink(failedFile.asString().c_str());
01389 filesystem::hardlinkCopy(destNew, failedFile);
01390 }
01391 if (userabort)
01392 {
01393 filesystem::unlink(destNew);
01394 ZYPP_RETHROW(ex);
01395 }
01396 file = fopen(destNew.c_str(), "w+");
01397 if (!file)
01398 ZYPP_THROW(MediaWriteException(destNew));
01399 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options | OPTION_NO_REPORT_START);
01400 }
01401 }
01402
01403 if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 )))
01404 {
01405 ERR << "Failed to chmod file " << destNew << endl;
01406 }
01407 if (::fclose(file))
01408 {
01409 filesystem::unlink(destNew);
01410 ERR << "Fclose failed for file '" << destNew << "'" << endl;
01411 ZYPP_THROW(MediaWriteException(destNew));
01412 }
01413 if ( rename( destNew, dest ) != 0 )
01414 {
01415 ERR << "Rename failed" << endl;
01416 ZYPP_THROW(MediaWriteException(dest));
01417 }
01418 DBG << "done: " << PathInfo(dest) << endl;
01419 }
01420
01421 void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) const
01422 {
01423 Url baseurl(getFileUrl(filename));
01424 if (blklist && filesize == off_t(-1) && blklist->haveFilesize())
01425 filesize = blklist->getFilesize();
01426 if (blklist && !blklist->haveBlocks() && filesize != 0)
01427 blklist = 0;
01428 if (blklist && (filesize == 0 || !blklist->numBlocks()))
01429 {
01430 checkFileDigest(baseurl, fp, blklist);
01431 return;
01432 }
01433 if (filesize == 0)
01434 return;
01435 if (!_multi)
01436 {
01437 _multi = curl_multi_init();
01438 if (!_multi)
01439 ZYPP_THROW(MediaCurlInitException(baseurl));
01440 }
01441 multifetchrequest req(this, filename, baseurl, _multi, fp, report, blklist, filesize);
01442 req._timeout = _settings.timeout();
01443 req._connect_timeout = _settings.connectTimeout();
01444 req._maxspeed = _settings.maxDownloadSpeed();
01445 req._maxworkers = _settings.maxConcurrentConnections();
01446 if (req._maxworkers > MAXURLS)
01447 req._maxworkers = MAXURLS;
01448 if (req._maxworkers <= 0)
01449 req._maxworkers = 1;
01450 std::vector<Url> myurllist;
01451 for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter)
01452 {
01453 try
01454 {
01455 string scheme = urliter->getScheme();
01456 if (scheme == "http" || scheme == "https" || scheme == "ftp" || scheme == "tftp")
01457 {
01458 checkProtocol(*urliter);
01459 myurllist.push_back(*urliter);
01460 }
01461 }
01462 catch (...)
01463 {
01464 }
01465 }
01466 if (!myurllist.size())
01467 myurllist.push_back(baseurl);
01468 req.run(myurllist);
01469 checkFileDigest(baseurl, fp, blklist);
01470 }
01471
01472 void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList *blklist) const
01473 {
01474 if (!blklist || !blklist->haveFileChecksum())
01475 return;
01476 if (fseeko(fp, off_t(0), SEEK_SET))
01477 ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error"));
01478 Digest dig;
01479 blklist->createFileDigest(dig);
01480 char buf[4096];
01481 size_t l;
01482 while ((l = fread(buf, 1, sizeof(buf), fp)) > 0)
01483 dig.update(buf, l);
01484 if (!blklist->verifyFileDigest(dig))
01485 ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error"));
01486 }
01487
01488 bool MediaMultiCurl::isDNSok(const string &host) const
01489 {
01490 return _dnsok.find(host) == _dnsok.end() ? false : true;
01491 }
01492
01493 void MediaMultiCurl::setDNSok(const string &host) const
01494 {
01495 _dnsok.insert(host);
01496 }
01497
01498 CURL *MediaMultiCurl::fromEasyPool(const string &host) const
01499 {
01500 if (_easypool.find(host) == _easypool.end())
01501 return 0;
01502 CURL *ret = _easypool[host];
01503 _easypool.erase(host);
01504 return ret;
01505 }
01506
01507 void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const
01508 {
01509 CURL *oldeasy = _easypool[host];
01510 _easypool[host] = easy;
01511 if (oldeasy)
01512 curl_easy_cleanup(oldeasy);
01513 }
01514
01515 }
01516 }
01517