libzypp
10.5.0
|
00001 /*---------------------------------------------------------------------\ 00002 | ____ _ __ __ ___ | 00003 | |__ / \ / / . \ . \ | 00004 | / / \ V /| _/ _/ | 00005 | / /__ | | | | | | | 00006 | /_____||_| |_| |_| | 00007 | | 00008 \---------------------------------------------------------------------*/ 00013 #include <ctype.h> 00014 #include <sys/types.h> 00015 #include <signal.h> 00016 #include <sys/wait.h> 00017 #include <netdb.h> 00018 #include <arpa/inet.h> 00019 00020 #include <vector> 00021 #include <iostream> 00022 #include <algorithm> 00023 00024 00025 #include "zypp/ZConfig.h" 00026 #include "zypp/base/Logger.h" 00027 #include "zypp/media/MediaMultiCurl.h" 00028 #include "zypp/media/MetaLinkParser.h" 00029 00030 using namespace std; 00031 using namespace zypp::base; 00032 00033 #undef CURLVERSION_AT_LEAST 00034 #define CURLVERSION_AT_LEAST(M,N,O) LIBCURL_VERSION_NUM >= ((((M)<<8)+(N))<<8)+(O) 00035 00036 namespace zypp { 00037 namespace media { 00038 00039 00041 00042 00043 class multifetchrequest; 00044 00045 // Hack: we derive from MediaCurl just to get the storage space for 00046 // settings, url, curlerrors and the like 00047 00048 class multifetchworker : MediaCurl { 00049 friend class multifetchrequest; 00050 00051 public: 00052 multifetchworker(int no, multifetchrequest &request, const Url &url); 00053 ~multifetchworker(); 00054 void nextjob(); 00055 void run(); 00056 bool checkChecksum(); 00057 bool recheckChecksum(); 00058 void disableCompetition(); 00059 00060 void checkdns(); 00061 void adddnsfd(fd_set &rset, int &maxfd); 00062 void dnsevent(fd_set &rset); 00063 00064 int _workerno; 00065 00066 int _state; 00067 bool _competing; 00068 00069 size_t _blkno; 00070 off_t _blkstart; 00071 size_t _blksize; 00072 bool _noendrange; 00073 00074 double _blkstarttime; 00075 size_t _blkreceived; 00076 off_t _received; 00077 00078 double _avgspeed; 00079 double _maxspeed; 00080 00081 double _sleepuntil; 00082 00083 private: 00084 void stealjob(); 00085 00086 size_t writefunction(void *ptr, size_t size); 00087 static size_t _writefunction(void *ptr, size_t size, size_t nmemb, void *stream); 00088 00089 size_t headerfunction(char *ptr, size_t size); 00090 static size_t _headerfunction(void *ptr, size_t size, size_t nmemb, void *stream); 00091 00092 multifetchrequest *_request; 00093 int _pass; 00094 string _urlbuf; 00095 off_t _off; 00096 size_t _size; 00097 Digest _dig; 00098 00099 pid_t _pid; 00100 int _dnspipe; 00101 }; 00102 00103 #define WORKER_STARTING 0 00104 #define WORKER_LOOKUP 1 00105 #define WORKER_FETCH 2 00106 #define WORKER_DISCARD 3 00107 #define WORKER_DONE 4 00108 #define WORKER_SLEEP 5 00109 #define WORKER_BROKEN 6 00110 00111 00112 00113 class multifetchrequest { 00114 public: 00115 multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize); 00116 ~multifetchrequest(); 00117 00118 void run(std::vector<Url> &urllist); 00119 00120 protected: 00121 friend class multifetchworker; 00122 00123 const MediaMultiCurl *_context; 00124 const Pathname _filename; 00125 Url _baseurl; 00126 00127 FILE *_fp; 00128 callback::SendReport<DownloadProgressReport> *_report; 00129 MediaBlockList *_blklist; 00130 off_t _filesize; 00131 00132 CURLM *_multi; 00133 00134 std::list<multifetchworker *> _workers; 00135 bool _stealing; 00136 bool _havenewjob; 00137 00138 size_t _blkno; 00139 off_t _blkoff; 00140 size_t _activeworkers; 00141 size_t _lookupworkers; 00142 size_t _sleepworkers; 00143 double _minsleepuntil; 00144 bool _finished; 00145 off_t _totalsize; 00146 off_t _fetchedsize; 00147 off_t _fetchedgoodsize; 00148 00149 double _starttime; 00150 double _lastprogress; 00151 00152 double _lastperiodstart; 00153 double _lastperiodfetched; 00154 double _periodavg; 00155 00156 public: 00157 double _timeout; 00158 double _connect_timeout; 00159 double _maxspeed; 00160 int _maxworkers; 00161 }; 00162 00163 #define BLKSIZE 131072 00164 #define MAXURLS 10 00165 00166 00168 00169 static double 00170 currentTime() 00171 { 00172 struct timeval tv; 00173 if (gettimeofday(&tv, NULL)) 00174 return 0; 00175 return tv.tv_sec + tv.tv_usec / 1000000.; 00176 } 00177 00178 size_t 00179 multifetchworker::writefunction(void *ptr, size_t size) 00180 { 00181 size_t len, cnt; 00182 if (_state == WORKER_BROKEN) 00183 return size ? 0 : 1; 00184 00185 double now = currentTime(); 00186 00187 len = size > _size ? _size : size; 00188 if (!len) 00189 { 00190 // kill this job? 00191 return size; 00192 } 00193 00194 if (_blkstart && _off == _blkstart) 00195 { 00196 // make sure that the server replied with "partial content" 00197 // for http requests 00198 char *effurl; 00199 (void)curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &effurl); 00200 if (effurl && !strncasecmp(effurl, "http", 4)) 00201 { 00202 long statuscode = 0; 00203 (void)curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &statuscode); 00204 if (statuscode != 206) 00205 return size ? 0 : 1; 00206 } 00207 } 00208 00209 _blkreceived += len; 00210 _received += len; 00211 00212 _request->_lastprogress = now; 00213 00214 if (_state == WORKER_DISCARD || !_request->_fp) 00215 { 00216 // block is no longer needed 00217 // still calculate the checksum so that we can throw out bad servers 00218 if (_request->_blklist) 00219 _dig.update((const char *)ptr, len); 00220 _off += len; 00221 _size -= len; 00222 return size; 00223 } 00224 if (fseeko(_request->_fp, _off, SEEK_SET)) 00225 return size ? 0 : 1; 00226 cnt = fwrite(ptr, 1, len, _request->_fp); 00227 if (cnt > 0) 00228 { 00229 _request->_fetchedsize += cnt; 00230 if (_request->_blklist) 00231 _dig.update((const char *)ptr, cnt); 00232 _off += cnt; 00233 _size -= cnt; 00234 if (cnt == len) 00235 return size; 00236 } 00237 return cnt; 00238 } 00239 00240 size_t 00241 multifetchworker::_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) 00242 { 00243 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream); 00244 return me->writefunction(ptr, size * nmemb); 00245 } 00246 00247 size_t 00248 multifetchworker::headerfunction(char *p, size_t size) 00249 { 00250 size_t l = size; 00251 if (l > 9 && !strncasecmp(p, "Location:", 9)) 00252 { 00253 string line(p + 9, l - 9); 00254 if (line[l - 10] == '\r') 00255 line.erase(l - 10, 1); 00256 DBG << "#" << _workerno << ": redirecting to" << line << endl; 00257 return size; 00258 } 00259 if (l <= 14 || l >= 128 || strncasecmp(p, "Content-Range:", 14) != 0) 00260 return size; 00261 p += 14; 00262 l -= 14; 00263 while (l && (*p == ' ' || *p == '\t')) 00264 p++, l--; 00265 if (l < 6 || strncasecmp(p, "bytes", 5)) 00266 return size; 00267 p += 5; 00268 l -= 5; 00269 char buf[128]; 00270 memcpy(buf, p, l); 00271 buf[l] = 0; 00272 unsigned long long start, off, filesize; 00273 if (sscanf(buf, "%llu-%llu/%llu", &start, &off, &filesize) != 3) 00274 return size; 00275 if (_request->_filesize == (off_t)-1) 00276 { 00277 WAR << "#" << _workerno << ": setting request filesize to " << filesize << endl; 00278 _request->_filesize = filesize; 00279 if (_request->_totalsize == 0 && !_request->_blklist) 00280 _request->_totalsize = filesize; 00281 } 00282 if (_request->_filesize != (off_t)filesize) 00283 { 00284 DBG << "#" << _workerno << ": filesize mismatch" << endl; 00285 _state = WORKER_BROKEN; 00286 strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE); 00287 } 00288 return size; 00289 } 00290 00291 size_t 00292 multifetchworker::_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) 00293 { 00294 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream); 00295 return me->headerfunction((char *)ptr, size * nmemb); 00296 } 00297 00298 multifetchworker::multifetchworker(int no, multifetchrequest &request, const Url &url) 00299 : MediaCurl(url, Pathname()) 00300 { 00301 _workerno = no; 00302 _request = &request; 00303 _state = WORKER_STARTING; 00304 _competing = false; 00305 _off = _blkstart = 0; 00306 _size = _blksize = 0; 00307 _pass = 0; 00308 _blkno = 0; 00309 _pid = 0; 00310 _dnspipe = -1; 00311 _blkreceived = 0; 00312 _received = 0; 00313 _blkstarttime = 0; 00314 _avgspeed = 0; 00315 _sleepuntil = 0; 00316 _maxspeed = _request->_maxspeed; 00317 _noendrange = false; 00318 00319 Url curlUrl( clearQueryString(url) ); 00320 _urlbuf = curlUrl.asString(); 00321 _curl = _request->_context->fromEasyPool(_url.getHost()); 00322 if (_curl) 00323 DBG << "reused worker from pool" << endl; 00324 if (!_curl && !(_curl = curl_easy_init())) 00325 { 00326 _state = WORKER_BROKEN; 00327 strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE); 00328 return; 00329 } 00330 try 00331 { 00332 setupEasy(); 00333 } 00334 catch (Exception &ex) 00335 { 00336 curl_easy_cleanup(_curl); 00337 _curl = 0; 00338 _state = WORKER_BROKEN; 00339 strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE); 00340 return; 00341 } 00342 curl_easy_setopt(_curl, CURLOPT_PRIVATE, this); 00343 curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str()); 00344 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, &_writefunction); 00345 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this); 00346 if (_request->_filesize == off_t(-1) || !_request->_blklist || !_request->_blklist->haveChecksum(0)) 00347 { 00348 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, &_headerfunction); 00349 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, this); 00350 } 00351 // if this is the same host copy authorization 00352 // (the host check is also what curl does when doing a redirect) 00353 // (note also that unauthorized exceptions are thrown with the request host) 00354 if (url.getHost() == _request->_context->_url.getHost()) 00355 { 00356 _settings.setUsername(_request->_context->_settings.username()); 00357 _settings.setPassword(_request->_context->_settings.password()); 00358 _settings.setAuthType(_request->_context->_settings.authType()); 00359 if ( _settings.userPassword().size() ) 00360 { 00361 curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str()); 00362 string use_auth = _settings.authType(); 00363 if (use_auth.empty()) 00364 use_auth = "digest,basic"; // our default 00365 long auth = CurlAuthData::auth_type_str2long(use_auth); 00366 if( auth != CURLAUTH_NONE) 00367 { 00368 DBG << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth 00369 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl; 00370 curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth); 00371 } 00372 } 00373 } 00374 checkdns(); 00375 } 00376 00377 multifetchworker::~multifetchworker() 00378 { 00379 if (_curl) 00380 { 00381 if (_state == WORKER_FETCH || _state == WORKER_DISCARD) 00382 curl_multi_remove_handle(_request->_multi, _curl); 00383 if (_state == WORKER_DONE || _state == WORKER_SLEEP) 00384 { 00385 #if CURLVERSION_AT_LEAST(7,15,5) 00386 curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0); 00387 #endif 00388 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0); 00389 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0); 00390 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0); 00391 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0); 00392 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0); 00393 _request->_context->toEasyPool(_url.getHost(), _curl); 00394 } 00395 else 00396 curl_easy_cleanup(_curl); 00397 _curl = 0; 00398 } 00399 if (_pid) 00400 { 00401 kill(_pid, SIGKILL); 00402 int status; 00403 while (waitpid(_pid, &status, 0) == -1) 00404 if (errno != EINTR) 00405 break; 00406 _pid = 0; 00407 } 00408 if (_dnspipe != -1) 00409 { 00410 close(_dnspipe); 00411 _dnspipe = -1; 00412 } 00413 // the destructor in MediaCurl doesn't call disconnect() if 00414 // the media is not attached, so we do it here manually 00415 disconnectFrom(); 00416 } 00417 00418 static inline bool env_isset(string name) 00419 { 00420 const char *s = getenv(name.c_str()); 00421 return s && *s ? true : false; 00422 } 00423 00424 void 00425 multifetchworker::checkdns() 00426 { 00427 string host = _url.getHost(); 00428 00429 if (host.empty()) 00430 return; 00431 00432 if (_request->_context->isDNSok(host)) 00433 return; 00434 00435 // no need to do dns checking for numeric hosts 00436 char addrbuf[128]; 00437 if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1) 00438 return; 00439 if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1) 00440 return; 00441 00442 // no need to do dns checking if we use a proxy 00443 if (!_settings.proxy().empty()) 00444 return; 00445 if (env_isset("all_proxy") || env_isset("ALL_PROXY")) 00446 return; 00447 string schemeproxy = _url.getScheme() + "_proxy"; 00448 if (env_isset(schemeproxy)) 00449 return; 00450 if (schemeproxy != "http_proxy") 00451 { 00452 std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper); 00453 if (env_isset(schemeproxy)) 00454 return; 00455 } 00456 00457 DBG << "checking DNS lookup of " << host << endl; 00458 int pipefds[2]; 00459 if (pipe(pipefds)) 00460 { 00461 _state = WORKER_BROKEN; 00462 strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE); 00463 return; 00464 } 00465 _pid = fork(); 00466 if (_pid == pid_t(-1)) 00467 { 00468 close(pipefds[0]); 00469 close(pipefds[1]); 00470 _pid = 0; 00471 _state = WORKER_BROKEN; 00472 strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE); 00473 return; 00474 } 00475 else if (_pid == 0) 00476 { 00477 close(pipefds[0]); 00478 // XXX: close all other file descriptors 00479 struct addrinfo *ai, aihints; 00480 memset(&aihints, 0, sizeof(aihints)); 00481 aihints.ai_family = PF_UNSPEC; 00482 int tstsock = socket(PF_INET6, SOCK_DGRAM | SOCK_CLOEXEC, 0); 00483 if (tstsock == -1) 00484 aihints.ai_family = PF_INET; 00485 else 00486 close(tstsock); 00487 aihints.ai_socktype = SOCK_STREAM; 00488 aihints.ai_flags = AI_CANONNAME; 00489 unsigned int connecttimeout = _request->_connect_timeout; 00490 if (connecttimeout) 00491 alarm(connecttimeout); 00492 signal(SIGALRM, SIG_DFL); 00493 if (getaddrinfo(host.c_str(), NULL, &aihints, &ai)) 00494 _exit(1); 00495 _exit(0); 00496 } 00497 close(pipefds[1]); 00498 _dnspipe = pipefds[0]; 00499 _state = WORKER_LOOKUP; 00500 } 00501 00502 void 00503 multifetchworker::adddnsfd(fd_set &rset, int &maxfd) 00504 { 00505 if (_state != WORKER_LOOKUP) 00506 return; 00507 FD_SET(_dnspipe, &rset); 00508 if (maxfd < _dnspipe) 00509 maxfd = _dnspipe; 00510 } 00511 00512 void 00513 multifetchworker::dnsevent(fd_set &rset) 00514 { 00515 00516 if (_state != WORKER_LOOKUP || !FD_ISSET(_dnspipe, &rset)) 00517 return; 00518 int status; 00519 while (waitpid(_pid, &status, 0) == -1) 00520 { 00521 if (errno != EINTR) 00522 return; 00523 } 00524 _pid = 0; 00525 if (_dnspipe != -1) 00526 { 00527 close(_dnspipe); 00528 _dnspipe = -1; 00529 } 00530 if (!WIFEXITED(status)) 00531 { 00532 _state = WORKER_BROKEN; 00533 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE); 00534 _request->_activeworkers--; 00535 return; 00536 } 00537 int exitcode = WEXITSTATUS(status); 00538 DBG << "#" << _workerno << ": DNS lookup returned " << exitcode << endl; 00539 if (exitcode != 0) 00540 { 00541 _state = WORKER_BROKEN; 00542 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE); 00543 _request->_activeworkers--; 00544 return; 00545 } 00546 _request->_context->setDNSok(_url.getHost()); 00547 nextjob(); 00548 } 00549 00550 bool 00551 multifetchworker::checkChecksum() 00552 { 00553 // DBG << "checkChecksum block " << _blkno << endl; 00554 if (!_blksize || !_request->_blklist) 00555 return true; 00556 return _request->_blklist->verifyDigest(_blkno, _dig); 00557 } 00558 00559 bool 00560 multifetchworker::recheckChecksum() 00561 { 00562 // DBG << "recheckChecksum block " << _blkno << endl; 00563 if (!_request->_fp || !_blksize || !_request->_blklist) 00564 return true; 00565 if (fseeko(_request->_fp, _blkstart, SEEK_SET)) 00566 return false; 00567 char buf[4096]; 00568 size_t l = _blksize; 00569 _request->_blklist->createDigest(_dig); // resets digest 00570 while (l) 00571 { 00572 size_t cnt = l > sizeof(buf) ? sizeof(buf) : l; 00573 if (fread(buf, cnt, 1, _request->_fp) != 1) 00574 return false; 00575 _dig.update(buf, cnt); 00576 l -= cnt; 00577 } 00578 return _request->_blklist->verifyDigest(_blkno, _dig); 00579 } 00580 00581 00582 void 00583 multifetchworker::stealjob() 00584 { 00585 if (!_request->_stealing) 00586 { 00587 DBG << "start stealing!" << endl; 00588 _request->_stealing = true; 00589 } 00590 multifetchworker *best = 0; 00591 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin(); 00592 double now = 0; 00593 for (; workeriter != _request->_workers.end(); ++workeriter) 00594 { 00595 multifetchworker *worker = *workeriter; 00596 if (worker == this) 00597 continue; 00598 if (worker->_pass == -1) 00599 continue; // do not steal! 00600 if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_blksize) 00601 continue; // do not steal finished jobs 00602 if (!worker->_avgspeed && worker->_blkreceived) 00603 { 00604 if (!now) 00605 now = currentTime(); 00606 if (now > worker->_blkstarttime) 00607 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime); 00608 } 00609 if (!best || best->_pass > worker->_pass) 00610 { 00611 best = worker; 00612 continue; 00613 } 00614 if (best->_pass < worker->_pass) 00615 continue; 00616 // if it is the same block, we want to know the best worker, otherwise the worst 00617 if (worker->_blkstart == best->_blkstart) 00618 { 00619 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed < (best->_blksize - best->_blkreceived) * worker->_avgspeed) 00620 best = worker; 00621 } 00622 else 00623 { 00624 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed > (best->_blksize - best->_blkreceived) * worker->_avgspeed) 00625 best = worker; 00626 } 00627 } 00628 if (!best) 00629 { 00630 _state = WORKER_DONE; 00631 _request->_activeworkers--; 00632 _request->_finished = true; 00633 return; 00634 } 00635 // do not sleep twice 00636 if (_state != WORKER_SLEEP) 00637 { 00638 if (!_avgspeed && _blkreceived) 00639 { 00640 if (!now) 00641 now = currentTime(); 00642 if (now > _blkstarttime) 00643 _avgspeed = _blkreceived / (now - _blkstarttime); 00644 } 00645 00646 // lets see if we should sleep a bit 00647 DBG << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_blksize << endl; 00648 DBG << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_blksize - best->_blkreceived) << endl; 00649 if (_avgspeed && best->_avgspeed && best->_blksize - best->_blkreceived > 0 && 00650 (best->_blksize - best->_blkreceived) * _avgspeed < best->_blksize * best->_avgspeed) 00651 { 00652 if (!now) 00653 now = currentTime(); 00654 double sl = (best->_blksize - best->_blkreceived) / best->_avgspeed * 2; 00655 if (sl > 1) 00656 sl = 1; 00657 DBG << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl; 00658 _sleepuntil = now + sl; 00659 _state = WORKER_SLEEP; 00660 _request->_sleepworkers++; 00661 return; 00662 } 00663 } 00664 00665 _competing = true; 00666 best->_competing = true; 00667 _blkstart = best->_blkstart; 00668 _blksize = best->_blksize; 00669 best->_pass++; 00670 _pass = best->_pass; 00671 _blkno = best->_blkno; 00672 run(); 00673 } 00674 00675 void 00676 multifetchworker::disableCompetition() 00677 { 00678 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin(); 00679 for (; workeriter != _request->_workers.end(); ++workeriter) 00680 { 00681 multifetchworker *worker = *workeriter; 00682 if (worker == this) 00683 continue; 00684 if (worker->_blkstart == _blkstart) 00685 { 00686 if (worker->_state == WORKER_FETCH) 00687 worker->_state = WORKER_DISCARD; 00688 worker->_pass = -1; /* do not steal this one, we already have it */ 00689 } 00690 } 00691 } 00692 00693 00694 void 00695 multifetchworker::nextjob() 00696 { 00697 _noendrange = false; 00698 if (_request->_stealing) 00699 { 00700 stealjob(); 00701 return; 00702 } 00703 00704 MediaBlockList *blklist = _request->_blklist; 00705 if (!blklist) 00706 { 00707 _blksize = BLKSIZE; 00708 if (_request->_filesize != off_t(-1)) 00709 { 00710 if (_request->_blkoff >= _request->_filesize) 00711 { 00712 stealjob(); 00713 return; 00714 } 00715 _blksize = _request->_filesize - _request->_blkoff; 00716 if (_blksize > BLKSIZE) 00717 _blksize = BLKSIZE; 00718 } 00719 } 00720 else 00721 { 00722 MediaBlock blk = blklist->getBlock(_request->_blkno); 00723 while (_request->_blkoff >= (off_t)(blk.off + blk.size)) 00724 { 00725 if (++_request->_blkno == blklist->numBlocks()) 00726 { 00727 stealjob(); 00728 return; 00729 } 00730 blk = blklist->getBlock(_request->_blkno); 00731 _request->_blkoff = blk.off; 00732 } 00733 _blksize = blk.off + blk.size - _request->_blkoff; 00734 if (_blksize > BLKSIZE && !blklist->haveChecksum(_request->_blkno)) 00735 _blksize = BLKSIZE; 00736 } 00737 _blkno = _request->_blkno; 00738 _blkstart = _request->_blkoff; 00739 _request->_blkoff += _blksize; 00740 run(); 00741 } 00742 00743 void 00744 multifetchworker::run() 00745 { 00746 char rangebuf[128]; 00747 00748 if (_state == WORKER_BROKEN || _state == WORKER_DONE) 00749 return; // just in case... 00750 if (_noendrange) 00751 sprintf(rangebuf, "%llu-", (unsigned long long)_blkstart); 00752 else 00753 sprintf(rangebuf, "%llu-%llu", (unsigned long long)_blkstart, (unsigned long long)_blkstart + _blksize - 1); 00754 DBG << "#" << _workerno << ": BLK " << _blkno << ":" << rangebuf << " " << _url << endl; 00755 if (curl_easy_setopt(_curl, CURLOPT_RANGE, !_noendrange || _blkstart != 0 ? rangebuf : (char *)0) != CURLE_OK) 00756 { 00757 _request->_activeworkers--; 00758 _state = WORKER_BROKEN; 00759 strncpy(_curlError, "curl_easy_setopt range failed", CURL_ERROR_SIZE); 00760 return; 00761 } 00762 if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK) 00763 { 00764 _request->_activeworkers--; 00765 _state = WORKER_BROKEN; 00766 strncpy(_curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE); 00767 return; 00768 } 00769 _request->_havenewjob = true; 00770 _off = _blkstart; 00771 _size = _blksize; 00772 if (_request->_blklist) 00773 _request->_blklist->createDigest(_dig); // resets digest 00774 _state = WORKER_FETCH; 00775 00776 double now = currentTime(); 00777 _blkstarttime = now; 00778 _blkreceived = 0; 00779 } 00780 00781 00783 00784 00785 multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) : _context(context), _filename(filename), _baseurl(baseurl) 00786 { 00787 _fp = fp; 00788 _report = report; 00789 _blklist = blklist; 00790 _filesize = filesize; 00791 _multi = multi; 00792 _stealing = false; 00793 _havenewjob = false; 00794 _blkno = 0; 00795 if (_blklist) 00796 _blkoff = _blklist->getBlock(0).off; 00797 else 00798 _blkoff = 0; 00799 _activeworkers = 0; 00800 _lookupworkers = 0; 00801 _sleepworkers = 0; 00802 _minsleepuntil = 0; 00803 _finished = false; 00804 _fetchedsize = 0; 00805 _fetchedgoodsize = 0; 00806 _totalsize = 0; 00807 _lastperiodstart = _lastprogress = _starttime = currentTime(); 00808 _lastperiodfetched = 0; 00809 _periodavg = 0; 00810 _timeout = 0; 00811 _connect_timeout = 0; 00812 _maxspeed = 0; 00813 _maxworkers = 0; 00814 if (blklist) 00815 { 00816 for (size_t blkno = 0; blkno < blklist->numBlocks(); blkno++) 00817 { 00818 MediaBlock blk = blklist->getBlock(blkno); 00819 _totalsize += blk.size; 00820 } 00821 } 00822 else if (filesize != off_t(-1)) 00823 _totalsize = filesize; 00824 } 00825 00826 multifetchrequest::~multifetchrequest() 00827 { 00828 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00829 { 00830 multifetchworker *worker = *workeriter; 00831 *workeriter = NULL; 00832 delete worker; 00833 } 00834 _workers.clear(); 00835 } 00836 00837 void 00838 multifetchrequest::run(std::vector<Url> &urllist) 00839 { 00840 int workerno = 0; 00841 std::vector<Url>::iterator urliter = urllist.begin(); 00842 for (;;) 00843 { 00844 fd_set rset, wset, xset; 00845 int maxfd, nqueue; 00846 00847 if (_finished) 00848 { 00849 DBG << "finished!" << endl; 00850 break; 00851 } 00852 00853 if ((int)_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS) 00854 { 00855 // spawn another worker! 00856 multifetchworker *worker = new multifetchworker(workerno++, *this, *urliter); 00857 _workers.push_back(worker); 00858 if (worker->_state != WORKER_BROKEN) 00859 { 00860 _activeworkers++; 00861 if (worker->_state != WORKER_LOOKUP) 00862 { 00863 worker->nextjob(); 00864 } 00865 else 00866 _lookupworkers++; 00867 } 00868 ++urliter; 00869 continue; 00870 } 00871 if (!_activeworkers) 00872 { 00873 WAR << "No more active workers!" << endl; 00874 // show the first worker error we find 00875 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00876 { 00877 if ((*workeriter)->_state != WORKER_BROKEN) 00878 continue; 00879 ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError)); 00880 } 00881 break; 00882 } 00883 00884 FD_ZERO(&rset); 00885 FD_ZERO(&wset); 00886 FD_ZERO(&xset); 00887 00888 curl_multi_fdset(_multi, &rset, &wset, &xset, &maxfd); 00889 00890 if (_lookupworkers) 00891 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00892 (*workeriter)->adddnsfd(rset, maxfd); 00893 00894 timeval tv; 00895 // if we added a new job we have to call multi_perform once 00896 // to make it show up in the fd set. do not sleep in this case. 00897 tv.tv_sec = 0; 00898 tv.tv_usec = _havenewjob ? 0 : 200000; 00899 if (_sleepworkers && !_havenewjob) 00900 { 00901 if (_minsleepuntil == 0) 00902 { 00903 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00904 { 00905 multifetchworker *worker = *workeriter; 00906 if (worker->_state != WORKER_SLEEP) 00907 continue; 00908 if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil) 00909 _minsleepuntil = worker->_sleepuntil; 00910 } 00911 } 00912 double sl = _minsleepuntil - currentTime(); 00913 if (sl < 0) 00914 { 00915 sl = 0; 00916 _minsleepuntil = 0; 00917 } 00918 if (sl < .2) 00919 tv.tv_usec = sl * 1000000; 00920 } 00921 int r = select(maxfd + 1, &rset, &wset, &xset, &tv); 00922 if (r == -1 && errno != EINTR) 00923 ZYPP_THROW(MediaCurlException(_baseurl, "select() failed", "unknown error")); 00924 if (r != 0 && _lookupworkers) 00925 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00926 { 00927 multifetchworker *worker = *workeriter; 00928 if (worker->_state != WORKER_LOOKUP) 00929 continue; 00930 (*workeriter)->dnsevent(rset); 00931 if (worker->_state != WORKER_LOOKUP) 00932 _lookupworkers--; 00933 } 00934 _havenewjob = false; 00935 00936 // run curl 00937 for (;;) 00938 { 00939 CURLMcode mcode; 00940 int tasks; 00941 mcode = curl_multi_perform(_multi, &tasks); 00942 if (mcode == CURLM_CALL_MULTI_PERFORM) 00943 continue; 00944 if (mcode != CURLM_OK) 00945 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_perform", "unknown error")); 00946 break; 00947 } 00948 00949 double now = currentTime(); 00950 00951 // update periodavg 00952 if (now > _lastperiodstart + .5) 00953 { 00954 if (!_periodavg) 00955 _periodavg = (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart); 00956 else 00957 _periodavg = (_periodavg + (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart)) / 2; 00958 _lastperiodfetched = _fetchedsize; 00959 _lastperiodstart = now; 00960 } 00961 00962 // wake up sleepers 00963 if (_sleepworkers) 00964 { 00965 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00966 { 00967 multifetchworker *worker = *workeriter; 00968 if (worker->_state != WORKER_SLEEP) 00969 continue; 00970 if (worker->_sleepuntil > now) 00971 continue; 00972 if (_minsleepuntil == worker->_sleepuntil) 00973 _minsleepuntil = 0; 00974 DBG << "#" << worker->_workerno << ": sleep done, wake up" << endl; 00975 _sleepworkers--; 00976 // nextjob chnages the state 00977 worker->nextjob(); 00978 } 00979 } 00980 00981 // collect all curl results, reschedule new jobs 00982 CURLMsg *msg; 00983 while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0) 00984 { 00985 if (msg->msg != CURLMSG_DONE) 00986 continue; 00987 CURL *easy = msg->easy_handle; 00988 CURLcode cc = msg->data.result; 00989 multifetchworker *worker; 00990 if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK) 00991 ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error")); 00992 if (worker->_blkreceived && now > worker->_blkstarttime) 00993 { 00994 if (worker->_avgspeed) 00995 worker->_avgspeed = (worker->_avgspeed + worker->_blkreceived / (now - worker->_blkstarttime)) / 2; 00996 else 00997 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime); 00998 } 00999 DBG << "#" << worker->_workerno << ": BLK " << worker->_blkno << " done code " << cc << " speed " << worker->_avgspeed << endl; 01000 curl_multi_remove_handle(_multi, easy); 01001 if (cc == CURLE_HTTP_RETURNED_ERROR) 01002 { 01003 long statuscode = 0; 01004 (void)curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &statuscode); 01005 DBG << "HTTP status " << statuscode << endl; 01006 if (statuscode == 416 && !_blklist) /* Range error */ 01007 { 01008 if (_filesize == off_t(-1)) 01009 { 01010 if (!worker->_noendrange) 01011 { 01012 DBG << "#" << worker->_workerno << ": retrying with no end range" << endl; 01013 worker->_noendrange = true; 01014 worker->run(); 01015 continue; 01016 } 01017 worker->_noendrange = false; 01018 worker->stealjob(); 01019 continue; 01020 } 01021 if (worker->_blkstart >= _filesize) 01022 { 01023 worker->nextjob(); 01024 continue; 01025 } 01026 } 01027 } 01028 if (cc == 0) 01029 { 01030 if (!worker->checkChecksum()) 01031 { 01032 WAR << "#" << worker->_workerno << ": checksum error, disable worker" << endl; 01033 worker->_state = WORKER_BROKEN; 01034 strncpy(worker->_curlError, "checksum error", CURL_ERROR_SIZE); 01035 _activeworkers--; 01036 continue; 01037 } 01038 if (worker->_state == WORKER_FETCH) 01039 { 01040 if (worker->_competing) 01041 { 01042 worker->disableCompetition(); 01043 // multiple workers wrote into this block. We already know that our 01044 // data was correct, but maybe some other worker overwrote our data 01045 // with something broken. Thus we have to re-check the block. 01046 if (!worker->recheckChecksum()) 01047 { 01048 DBG << "#" << worker->_workerno << ": recheck checksum error, refetch block" << endl; 01049 // re-fetch! No need to worry about the bad workers, 01050 // they will now be set to DISCARD. At the end of their block 01051 // they will notice that they wrote bad data and go into BROKEN. 01052 worker->run(); 01053 continue; 01054 } 01055 } 01056 _fetchedgoodsize += worker->_blksize; 01057 } 01058 01059 // make bad workers sleep a little 01060 double maxavg = 0; 01061 int maxworkerno = 0; 01062 int numbetter = 0; 01063 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 01064 { 01065 multifetchworker *oworker = *workeriter; 01066 if (oworker->_state == WORKER_BROKEN) 01067 continue; 01068 if (oworker->_avgspeed > maxavg) 01069 { 01070 maxavg = oworker->_avgspeed; 01071 maxworkerno = oworker->_workerno; 01072 } 01073 if (oworker->_avgspeed > worker->_avgspeed) 01074 numbetter++; 01075 } 01076 if (maxavg && !_stealing) 01077 { 01078 double ratio = worker->_avgspeed / maxavg; 01079 ratio = 1 - ratio; 01080 if (numbetter < 3) // don't sleep that much if we're in the top two 01081 ratio = ratio * ratio; 01082 if (ratio > .01) 01083 { 01084 DBG << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl; 01085 worker->_sleepuntil = now + ratio; 01086 worker->_state = WORKER_SLEEP; 01087 _sleepworkers++; 01088 continue; 01089 } 01090 } 01091 01092 // do rate control (if requested) 01093 // should use periodavg, but that's not what libcurl does 01094 if (_maxspeed && now > _starttime) 01095 { 01096 double avg = _fetchedsize / (now - _starttime); 01097 avg = worker->_maxspeed * _maxspeed / avg; 01098 if (avg < _maxspeed / _maxworkers) 01099 avg = _maxspeed / _maxworkers; 01100 if (avg > _maxspeed) 01101 avg = _maxspeed; 01102 if (avg < 1024) 01103 avg = 1024; 01104 worker->_maxspeed = avg; 01105 #if CURLVERSION_AT_LEAST(7,15,5) 01106 curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg)); 01107 #endif 01108 } 01109 01110 worker->nextjob(); 01111 } 01112 else 01113 { 01114 worker->_state = WORKER_BROKEN; 01115 _activeworkers--; 01116 if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS)) 01117 { 01118 // end of workers reached! goodbye! 01119 worker->evaluateCurlCode(Pathname(), cc, false); 01120 } 01121 } 01122 } 01123 01124 // send report 01125 if (_report) 01126 { 01127 int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0; 01128 double avg = 0; 01129 if (now > _starttime) 01130 avg = _fetchedsize / (now - _starttime); 01131 if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg)) 01132 ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled")); 01133 } 01134 01135 if (_timeout && now - _lastprogress > _timeout) 01136 break; 01137 } 01138 01139 if (!_finished) 01140 ZYPP_THROW(MediaTimeoutException(_baseurl)); 01141 01142 // print some download stats 01143 WAR << "overall result" << endl; 01144 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 01145 { 01146 multifetchworker *worker = *workeriter; 01147 WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl; 01148 } 01149 } 01150 01151 01153 01154 01155 MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r) 01156 : MediaCurl(url_r, attach_point_hint_r) 01157 { 01158 MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl; 01159 _multi = 0; 01160 _customHeadersMetalink = 0; 01161 } 01162 01163 MediaMultiCurl::~MediaMultiCurl() 01164 { 01165 if (_customHeadersMetalink) 01166 { 01167 curl_slist_free_all(_customHeadersMetalink); 01168 _customHeadersMetalink = 0; 01169 } 01170 if (_multi) 01171 { 01172 curl_multi_cleanup(_multi); 01173 _multi = 0; 01174 } 01175 std::map<std::string, CURL *>::iterator it; 01176 for (it = _easypool.begin(); it != _easypool.end(); it++) 01177 { 01178 CURL *easy = it->second; 01179 if (easy) 01180 { 01181 curl_easy_cleanup(easy); 01182 it->second = NULL; 01183 } 01184 } 01185 } 01186 01187 void MediaMultiCurl::setupEasy() 01188 { 01189 MediaCurl::setupEasy(); 01190 01191 if (_customHeadersMetalink) 01192 { 01193 curl_slist_free_all(_customHeadersMetalink); 01194 _customHeadersMetalink = 0; 01195 } 01196 struct curl_slist *sl = _customHeaders; 01197 for (; sl; sl = sl->next) 01198 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data); 01199 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/metalink+xml, application/metalink4+xml"); 01200 } 01201 01202 static bool looks_like_metalink(const Pathname & file) 01203 { 01204 char buf[256], *p; 01205 int fd, l; 01206 if ((fd = open(file.asString().c_str(), O_RDONLY|O_CLOEXEC)) == -1) 01207 return false; 01208 while ((l = read(fd, buf, sizeof(buf) - 1)) == -1 && errno == EINTR) 01209 ; 01210 close(fd); 01211 if (l == -1) 01212 return 0; 01213 buf[l] = 0; 01214 p = buf; 01215 while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n') 01216 p++; 01217 if (!strncasecmp(p, "<?xml", 5)) 01218 { 01219 while (*p && *p != '>') 01220 p++; 01221 if (*p == '>') 01222 p++; 01223 while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n') 01224 p++; 01225 } 01226 bool ret = !strncasecmp(p, "<metalink", 9) ? true : false; 01227 DBG << "looks_like_metalink(" << file << "): " << ret << endl; 01228 return ret; 01229 } 01230 01231 void MediaMultiCurl::doGetFileCopy( const Pathname & filename , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const 01232 { 01233 Pathname dest = target.absolutename(); 01234 if( assert_dir( dest.dirname() ) ) 01235 { 01236 DBG << "assert_dir " << dest.dirname() << " failed" << endl; 01237 Url url(getFileUrl(filename)); 01238 ZYPP_THROW( MediaSystemException(url, "System error on " + dest.dirname().asString()) ); 01239 } 01240 string destNew = target.asString() + ".new.zypp.XXXXXX"; 01241 char *buf = ::strdup( destNew.c_str()); 01242 if( !buf) 01243 { 01244 ERR << "out of memory for temp file name" << endl; 01245 Url url(getFileUrl(filename)); 01246 ZYPP_THROW(MediaSystemException(url, "out of memory for temp file name")); 01247 } 01248 01249 int tmp_fd = ::mkostemp( buf, O_CLOEXEC ); 01250 if( tmp_fd == -1) 01251 { 01252 free( buf); 01253 ERR << "mkstemp failed for file '" << destNew << "'" << endl; 01254 ZYPP_THROW(MediaWriteException(destNew)); 01255 } 01256 destNew = buf; 01257 free( buf); 01258 01259 FILE *file = ::fdopen( tmp_fd, "we" ); 01260 if ( !file ) { 01261 ::close( tmp_fd); 01262 filesystem::unlink( destNew ); 01263 ERR << "fopen failed for file '" << destNew << "'" << endl; 01264 ZYPP_THROW(MediaWriteException(destNew)); 01265 } 01266 DBG << "dest: " << dest << endl; 01267 DBG << "temp: " << destNew << endl; 01268 01269 // set IFMODSINCE time condition (no download if not modified) 01270 if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) ) 01271 { 01272 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE); 01273 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime()); 01274 } 01275 else 01276 { 01277 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE); 01278 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L); 01279 } 01280 // change header to include Accept: metalink 01281 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink); 01282 try 01283 { 01284 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options); 01285 } 01286 catch (Exception &ex) 01287 { 01288 ::fclose(file); 01289 filesystem::unlink(destNew); 01290 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE); 01291 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L); 01292 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders); 01293 ZYPP_RETHROW(ex); 01294 } 01295 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE); 01296 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L); 01297 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders); 01298 long httpReturnCode = 0; 01299 CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode); 01300 if (infoRet == CURLE_OK) 01301 { 01302 DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl; 01303 if ( httpReturnCode == 304 01304 || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified 01305 { 01306 DBG << "not modified: " << PathInfo(dest) << endl; 01307 return; 01308 } 01309 } 01310 else 01311 { 01312 WAR << "Could not get the reponse code." << endl; 01313 } 01314 01315 bool ismetalink = false; 01316 01317 char *ptr = NULL; 01318 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr) 01319 { 01320 string ct = string(ptr); 01321 if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0) 01322 ismetalink = true; 01323 } 01324 01325 if (!ismetalink) 01326 { 01327 // some proxies do not store the content type, so also look at the file to find 01328 // out if we received a metalink (bnc#649925) 01329 fflush(file); 01330 if (looks_like_metalink(Pathname(destNew))) 01331 ismetalink = true; 01332 } 01333 01334 if (ismetalink) 01335 { 01336 bool userabort = false; 01337 fclose(file); 01338 file = NULL; 01339 Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed"; 01340 try 01341 { 01342 MetaLinkParser mlp; 01343 mlp.parse(Pathname(destNew)); 01344 MediaBlockList bl = mlp.getBlockList(); 01345 vector<Url> urls = mlp.getUrls(); 01346 DBG << bl << endl; 01347 file = fopen(destNew.c_str(), "w+e"); 01348 if (!file) 01349 ZYPP_THROW(MediaWriteException(destNew)); 01350 if (PathInfo(target).isExist()) 01351 { 01352 DBG << "reusing blocks from file " << target << endl; 01353 bl.reuseBlocks(file, target.asString()); 01354 DBG << bl << endl; 01355 } 01356 if (bl.haveChecksum(1) && PathInfo(failedFile).isExist()) 01357 { 01358 DBG << "reusing blocks from file " << failedFile << endl; 01359 bl.reuseBlocks(file, failedFile.asString()); 01360 DBG << bl << endl; 01361 filesystem::unlink(failedFile); 01362 } 01363 Pathname df = deltafile(); 01364 if (!df.empty()) 01365 { 01366 DBG << "reusing blocks from file " << df << endl; 01367 bl.reuseBlocks(file, df.asString()); 01368 DBG << bl << endl; 01369 } 01370 try 01371 { 01372 multifetch(filename, file, &urls, &report, &bl); 01373 } 01374 catch (MediaCurlException &ex) 01375 { 01376 userabort = ex.errstr() == "User abort"; 01377 ZYPP_RETHROW(ex); 01378 } 01379 } 01380 catch (Exception &ex) 01381 { 01382 // something went wrong. fall back to normal download 01383 if (file) 01384 fclose(file); 01385 file = NULL; 01386 if (PathInfo(destNew).size() >= 63336) 01387 { 01388 ::unlink(failedFile.asString().c_str()); 01389 filesystem::hardlinkCopy(destNew, failedFile); 01390 } 01391 if (userabort) 01392 { 01393 filesystem::unlink(destNew); 01394 ZYPP_RETHROW(ex); 01395 } 01396 file = fopen(destNew.c_str(), "w+e"); 01397 if (!file) 01398 ZYPP_THROW(MediaWriteException(destNew)); 01399 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options | OPTION_NO_REPORT_START); 01400 } 01401 } 01402 01403 if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 ))) 01404 { 01405 ERR << "Failed to chmod file " << destNew << endl; 01406 } 01407 if (::fclose(file)) 01408 { 01409 filesystem::unlink(destNew); 01410 ERR << "Fclose failed for file '" << destNew << "'" << endl; 01411 ZYPP_THROW(MediaWriteException(destNew)); 01412 } 01413 if ( rename( destNew, dest ) != 0 ) 01414 { 01415 ERR << "Rename failed" << endl; 01416 ZYPP_THROW(MediaWriteException(dest)); 01417 } 01418 DBG << "done: " << PathInfo(dest) << endl; 01419 } 01420 01421 void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) const 01422 { 01423 Url baseurl(getFileUrl(filename)); 01424 if (blklist && filesize == off_t(-1) && blklist->haveFilesize()) 01425 filesize = blklist->getFilesize(); 01426 if (blklist && !blklist->haveBlocks() && filesize != 0) 01427 blklist = 0; 01428 if (blklist && (filesize == 0 || !blklist->numBlocks())) 01429 { 01430 checkFileDigest(baseurl, fp, blklist); 01431 return; 01432 } 01433 if (filesize == 0) 01434 return; 01435 if (!_multi) 01436 { 01437 _multi = curl_multi_init(); 01438 if (!_multi) 01439 ZYPP_THROW(MediaCurlInitException(baseurl)); 01440 } 01441 multifetchrequest req(this, filename, baseurl, _multi, fp, report, blklist, filesize); 01442 req._timeout = _settings.timeout(); 01443 req._connect_timeout = _settings.connectTimeout(); 01444 req._maxspeed = _settings.maxDownloadSpeed(); 01445 req._maxworkers = _settings.maxConcurrentConnections(); 01446 if (req._maxworkers > MAXURLS) 01447 req._maxworkers = MAXURLS; 01448 if (req._maxworkers <= 0) 01449 req._maxworkers = 1; 01450 std::vector<Url> myurllist; 01451 for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter) 01452 { 01453 try 01454 { 01455 string scheme = urliter->getScheme(); 01456 if (scheme == "http" || scheme == "https" || scheme == "ftp") 01457 { 01458 checkProtocol(*urliter); 01459 myurllist.push_back(*urliter); 01460 } 01461 } 01462 catch (...) 01463 { 01464 } 01465 } 01466 if (!myurllist.size()) 01467 myurllist.push_back(baseurl); 01468 req.run(myurllist); 01469 checkFileDigest(baseurl, fp, blklist); 01470 } 01471 01472 void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList *blklist) const 01473 { 01474 if (!blklist || !blklist->haveFileChecksum()) 01475 return; 01476 if (fseeko(fp, off_t(0), SEEK_SET)) 01477 ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error")); 01478 Digest dig; 01479 blklist->createFileDigest(dig); 01480 char buf[4096]; 01481 size_t l; 01482 while ((l = fread(buf, 1, sizeof(buf), fp)) > 0) 01483 dig.update(buf, l); 01484 if (!blklist->verifyFileDigest(dig)) 01485 ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error")); 01486 } 01487 01488 bool MediaMultiCurl::isDNSok(const string &host) const 01489 { 01490 return _dnsok.find(host) == _dnsok.end() ? false : true; 01491 } 01492 01493 void MediaMultiCurl::setDNSok(const string &host) const 01494 { 01495 _dnsok.insert(host); 01496 } 01497 01498 CURL *MediaMultiCurl::fromEasyPool(const string &host) const 01499 { 01500 if (_easypool.find(host) == _easypool.end()) 01501 return 0; 01502 CURL *ret = _easypool[host]; 01503 _easypool.erase(host); 01504 return ret; 01505 } 01506 01507 void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const 01508 { 01509 CURL *oldeasy = _easypool[host]; 01510 _easypool[host] = easy; 01511 if (oldeasy) 01512 curl_easy_cleanup(oldeasy); 01513 } 01514 01515 } // namespace media 01516 } // namespace zypp 01517