libzypp 8.13.6
|
00001 /*---------------------------------------------------------------------\ 00002 | ____ _ __ __ ___ | 00003 | |__ / \ / / . \ . \ | 00004 | / / \ V /| _/ _/ | 00005 | / /__ | | | | | | | 00006 | /_____||_| |_| |_| | 00007 | | 00008 \---------------------------------------------------------------------*/ 00013 #include <ctype.h> 00014 #include <sys/types.h> 00015 #include <signal.h> 00016 #include <sys/wait.h> 00017 #include <netdb.h> 00018 #include <arpa/inet.h> 00019 00020 #include <vector> 00021 #include <iostream> 00022 #include <algorithm> 00023 00024 00025 #include "zypp/ZConfig.h" 00026 #include "zypp/base/Logger.h" 00027 #include "zypp/media/MediaMultiCurl.h" 00028 #include "zypp/media/MetaLinkParser.h" 00029 00030 using namespace std; 00031 using namespace zypp::base; 00032 00033 namespace zypp { 00034 namespace media { 00035 00036 00038 00039 00040 class multifetchrequest; 00041 00042 // Hack: we derive from MediaCurl just to get the storage space for 00043 // settings, url, curlerrors and the like 00044 00045 class multifetchworker : MediaCurl { 00046 friend class multifetchrequest; 00047 00048 public: 00049 multifetchworker(int no, multifetchrequest &request, const Url &url); 00050 ~multifetchworker(); 00051 void nextjob(); 00052 void run(); 00053 bool checkChecksum(); 00054 bool recheckChecksum(); 00055 void disableCompetition(); 00056 00057 void checkdns(); 00058 void adddnsfd(fd_set &rset, int &maxfd); 00059 void dnsevent(fd_set &rset); 00060 00061 int _workerno; 00062 00063 int _state; 00064 bool _competing; 00065 00066 size_t _blkno; 00067 off_t _blkstart; 00068 size_t _blksize; 00069 bool _noendrange; 00070 00071 double _blkstarttime; 00072 size_t _blkreceived; 00073 off_t _received; 00074 00075 double _avgspeed; 00076 double _maxspeed; 00077 00078 double _sleepuntil; 00079 00080 private: 00081 void stealjob(); 00082 00083 size_t writefunction(void *ptr, size_t size); 00084 static size_t _writefunction(void *ptr, size_t size, size_t nmemb, void *stream); 00085 00086 size_t headerfunction(char *ptr, size_t size); 00087 static size_t _headerfunction(void *ptr, size_t size, size_t nmemb, void *stream); 00088 00089 multifetchrequest *_request; 00090 int _pass; 00091 string _urlbuf; 00092 off_t _off; 00093 size_t _size; 00094 Digest _dig; 00095 00096 pid_t _pid; 00097 int _dnspipe; 00098 }; 00099 00100 #define WORKER_STARTING 0 00101 #define WORKER_LOOKUP 1 00102 #define WORKER_FETCH 2 00103 #define WORKER_DISCARD 3 00104 #define WORKER_DONE 4 00105 #define WORKER_SLEEP 5 00106 #define WORKER_BROKEN 6 00107 00108 00109 00110 class multifetchrequest { 00111 public: 00112 multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize); 00113 ~multifetchrequest(); 00114 00115 void run(std::vector<Url> &urllist); 00116 00117 protected: 00118 friend class multifetchworker; 00119 00120 const MediaMultiCurl *_context; 00121 const Pathname _filename; 00122 Url _baseurl; 00123 00124 FILE *_fp; 00125 callback::SendReport<DownloadProgressReport> *_report; 00126 MediaBlockList *_blklist; 00127 off_t _filesize; 00128 00129 CURLM *_multi; 00130 00131 std::list<multifetchworker *> _workers; 00132 bool _stealing; 00133 bool _havenewjob; 00134 00135 size_t _blkno; 00136 off_t _blkoff; 00137 size_t _activeworkers; 00138 size_t _lookupworkers; 00139 size_t _sleepworkers; 00140 double _minsleepuntil; 00141 bool _finished; 00142 off_t _totalsize; 00143 off_t _fetchedsize; 00144 off_t _fetchedgoodsize; 00145 00146 double _starttime; 00147 double _lastprogress; 00148 00149 double _lastperiodstart; 00150 double _lastperiodfetched; 00151 double _periodavg; 00152 00153 public: 00154 double _timeout; 00155 double _connect_timeout; 00156 double _maxspeed; 00157 int _maxworkers; 00158 }; 00159 00160 #define BLKSIZE 131072 00161 #define MAXURLS 10 00162 00163 00165 00166 static double 00167 currentTime() 00168 { 00169 struct timeval tv; 00170 if (gettimeofday(&tv, NULL)) 00171 return 0; 00172 return tv.tv_sec + tv.tv_usec / 1000000.; 00173 } 00174 00175 size_t 00176 multifetchworker::writefunction(void *ptr, size_t size) 00177 { 00178 size_t len, cnt; 00179 if (_state == WORKER_BROKEN) 00180 return size ? 0 : 1; 00181 00182 double now = currentTime(); 00183 00184 len = size > _size ? _size : size; 00185 if (!len) 00186 { 00187 // kill this job? 00188 return size; 00189 } 00190 00191 if (_blkstart && _off == _blkstart) 00192 { 00193 // make sure that the server replied with "partial content" 00194 // for http requests 00195 char *effurl; 00196 (void)curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &effurl); 00197 if (effurl && !strncasecmp(effurl, "http", 4)) 00198 { 00199 long statuscode = 0; 00200 (void)curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &statuscode); 00201 if (statuscode != 206) 00202 return size ? 0 : 1; 00203 } 00204 } 00205 00206 _blkreceived += len; 00207 _received += len; 00208 00209 _request->_lastprogress = now; 00210 00211 if (_state == WORKER_DISCARD || !_request->_fp) 00212 { 00213 // block is no longer needed 00214 // still calculate the checksum so that we can throw out bad servers 00215 if (_request->_blklist) 00216 _dig.update((const char *)ptr, len); 00217 _off += len; 00218 _size -= len; 00219 return size; 00220 } 00221 if (fseeko(_request->_fp, _off, SEEK_SET)) 00222 return size ? 0 : 1; 00223 cnt = fwrite(ptr, 1, len, _request->_fp); 00224 if (cnt > 0) 00225 { 00226 _request->_fetchedsize += cnt; 00227 if (_request->_blklist) 00228 _dig.update((const char *)ptr, cnt); 00229 _off += cnt; 00230 _size -= cnt; 00231 if (cnt == len) 00232 return size; 00233 } 00234 return cnt; 00235 } 00236 00237 size_t 00238 multifetchworker::_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) 00239 { 00240 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream); 00241 return me->writefunction(ptr, size * nmemb); 00242 } 00243 00244 size_t 00245 multifetchworker::headerfunction(char *p, size_t size) 00246 { 00247 size_t l = size; 00248 if (l > 9 && !strncasecmp(p, "Location:", 9)) 00249 { 00250 string line(p + 9, l - 9); 00251 if (line[l - 10] == '\r') 00252 line.erase(l - 10, 1); 00253 DBG << "#" << _workerno << ": redirecting to" << line << endl; 00254 return size; 00255 } 00256 if (l <= 14 || l >= 128 || strncasecmp(p, "Content-Range:", 14) != 0) 00257 return size; 00258 p += 14; 00259 l -= 14; 00260 while (l && (*p == ' ' || *p == '\t')) 00261 p++, l--; 00262 if (l < 6 || strncasecmp(p, "bytes", 5)) 00263 return size; 00264 p += 5; 00265 l -= 5; 00266 char buf[128]; 00267 memcpy(buf, p, l); 00268 buf[l] = 0; 00269 unsigned long long start, off, filesize; 00270 if (sscanf(buf, "%llu-%llu/%llu", &start, &off, &filesize) != 3) 00271 return size; 00272 if (_request->_filesize == (off_t)-1) 00273 { 00274 WAR << "#" << _workerno << ": setting request filesize to " << filesize << endl; 00275 _request->_filesize = filesize; 00276 if (_request->_totalsize == 0 && !_request->_blklist) 00277 _request->_totalsize = filesize; 00278 } 00279 if (_request->_filesize != (off_t)filesize) 00280 { 00281 DBG << "#" << _workerno << ": filesize mismatch" << endl; 00282 _state = WORKER_BROKEN; 00283 strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE); 00284 } 00285 return size; 00286 } 00287 00288 size_t 00289 multifetchworker::_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) 00290 { 00291 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream); 00292 return me->headerfunction((char *)ptr, size * nmemb); 00293 } 00294 00295 multifetchworker::multifetchworker(int no, multifetchrequest &request, const Url &url) 00296 : MediaCurl(url, Pathname()) 00297 { 00298 _workerno = no; 00299 _request = &request; 00300 _state = WORKER_STARTING; 00301 _competing = false; 00302 _off = _blkstart = 0; 00303 _size = _blksize = 0; 00304 _pass = 0; 00305 _blkno = 0; 00306 _pid = 0; 00307 _dnspipe = -1; 00308 _blkreceived = 0; 00309 _received = 0; 00310 _blkstarttime = 0; 00311 _avgspeed = 0; 00312 _sleepuntil = 0; 00313 _maxspeed = _request->_maxspeed; 00314 _noendrange = false; 00315 00316 Url curlUrl( clearQueryString(url) ); 00317 _urlbuf = curlUrl.asString(); 00318 _curl = _request->_context->fromEasyPool(_url.getHost()); 00319 if (_curl) 00320 DBG << "reused worker from pool" << endl; 00321 if (!_curl && !(_curl = curl_easy_init())) 00322 { 00323 _state = WORKER_BROKEN; 00324 strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE); 00325 return; 00326 } 00327 try 00328 { 00329 setupEasy(); 00330 } 00331 catch (Exception &ex) 00332 { 00333 curl_easy_cleanup(_curl); 00334 _curl = 0; 00335 _state = WORKER_BROKEN; 00336 strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE); 00337 return; 00338 } 00339 curl_easy_setopt(_curl, CURLOPT_PRIVATE, this); 00340 curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str()); 00341 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, &_writefunction); 00342 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this); 00343 if (_request->_filesize == off_t(-1) || !_request->_blklist || !_request->_blklist->haveChecksum(0)) 00344 { 00345 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, &_headerfunction); 00346 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, this); 00347 } 00348 // if this is the same host copy authorization 00349 // (the host check is also what curl does when doing a redirect) 00350 // (note also that unauthorized exceptions are thrown with the request host) 00351 if (url.getHost() == _request->_context->_url.getHost()) 00352 { 00353 _settings.setUsername(_request->_context->_settings.username()); 00354 _settings.setPassword(_request->_context->_settings.password()); 00355 _settings.setAuthType(_request->_context->_settings.authType()); 00356 if ( _settings.userPassword().size() ) 00357 { 00358 curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str()); 00359 string use_auth = _settings.authType(); 00360 if (use_auth.empty()) 00361 use_auth = "digest,basic"; // our default 00362 long auth = CurlAuthData::auth_type_str2long(use_auth); 00363 if( auth != CURLAUTH_NONE) 00364 { 00365 DBG << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth 00366 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl; 00367 curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth); 00368 } 00369 } 00370 } 00371 checkdns(); 00372 } 00373 00374 multifetchworker::~multifetchworker() 00375 { 00376 if (_curl) 00377 { 00378 if (_state == WORKER_FETCH || _state == WORKER_DISCARD) 00379 curl_multi_remove_handle(_request->_multi, _curl); 00380 if (_state == WORKER_DONE || _state == WORKER_SLEEP) 00381 { 00382 curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0); 00383 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0); 00384 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0); 00385 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0); 00386 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0); 00387 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0); 00388 _request->_context->toEasyPool(_url.getHost(), _curl); 00389 } 00390 else 00391 curl_easy_cleanup(_curl); 00392 _curl = 0; 00393 } 00394 if (_pid) 00395 { 00396 kill(_pid, SIGKILL); 00397 int status; 00398 while (waitpid(_pid, &status, 0) == -1) 00399 if (errno != EINTR) 00400 break; 00401 _pid = 0; 00402 } 00403 if (_dnspipe != -1) 00404 { 00405 close(_dnspipe); 00406 _dnspipe = -1; 00407 } 00408 // the destructor in MediaCurl doesn't call disconnect() if 00409 // the media is not attached, so we do it here manually 00410 disconnectFrom(); 00411 } 00412 00413 static inline bool env_isset(string name) 00414 { 00415 const char *s = getenv(name.c_str()); 00416 return s && *s ? true : false; 00417 } 00418 00419 void 00420 multifetchworker::checkdns() 00421 { 00422 string host = _url.getHost(); 00423 00424 if (host.empty()) 00425 return; 00426 00427 if (_request->_context->isDNSok(host)) 00428 return; 00429 00430 // no need to do dns checking for numeric hosts 00431 char addrbuf[128]; 00432 if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1) 00433 return; 00434 if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1) 00435 return; 00436 00437 // no need to do dns checking if we use a proxy 00438 if (!_settings.proxy().empty()) 00439 return; 00440 if (env_isset("all_proxy") || env_isset("ALL_PROXY")) 00441 return; 00442 string schemeproxy = _url.getScheme() + "_proxy"; 00443 if (env_isset(schemeproxy)) 00444 return; 00445 if (schemeproxy != "http_proxy") 00446 { 00447 std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper); 00448 if (env_isset(schemeproxy)) 00449 return; 00450 } 00451 00452 DBG << "checking DNS lookup of " << host << endl; 00453 int pipefds[2]; 00454 if (pipe(pipefds)) 00455 { 00456 _state = WORKER_BROKEN; 00457 strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE); 00458 return; 00459 } 00460 _pid = fork(); 00461 if (_pid == pid_t(-1)) 00462 { 00463 close(pipefds[0]); 00464 close(pipefds[1]); 00465 _pid = 0; 00466 _state = WORKER_BROKEN; 00467 strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE); 00468 return; 00469 } 00470 else if (_pid == 0) 00471 { 00472 close(pipefds[0]); 00473 // XXX: close all other file descriptors 00474 struct addrinfo *ai, aihints; 00475 memset(&aihints, 0, sizeof(aihints)); 00476 aihints.ai_family = PF_UNSPEC; 00477 int tstsock = socket(PF_INET6, SOCK_DGRAM, 0); 00478 if (tstsock == -1) 00479 aihints.ai_family = PF_INET; 00480 else 00481 close(tstsock); 00482 aihints.ai_socktype = SOCK_STREAM; 00483 aihints.ai_flags = AI_CANONNAME; 00484 unsigned int connecttimeout = _request->_connect_timeout; 00485 if (connecttimeout) 00486 alarm(connecttimeout); 00487 signal(SIGALRM, SIG_DFL); 00488 if (getaddrinfo(host.c_str(), NULL, &aihints, &ai)) 00489 _exit(1); 00490 _exit(0); 00491 } 00492 close(pipefds[1]); 00493 _dnspipe = pipefds[0]; 00494 _state = WORKER_LOOKUP; 00495 } 00496 00497 void 00498 multifetchworker::adddnsfd(fd_set &rset, int &maxfd) 00499 { 00500 if (_state != WORKER_LOOKUP) 00501 return; 00502 FD_SET(_dnspipe, &rset); 00503 if (maxfd < _dnspipe) 00504 maxfd = _dnspipe; 00505 } 00506 00507 void 00508 multifetchworker::dnsevent(fd_set &rset) 00509 { 00510 00511 if (_state != WORKER_LOOKUP || !FD_ISSET(_dnspipe, &rset)) 00512 return; 00513 int status; 00514 while (waitpid(_pid, &status, 0) == -1) 00515 { 00516 if (errno != EINTR) 00517 return; 00518 } 00519 _pid = 0; 00520 if (_dnspipe != -1) 00521 { 00522 close(_dnspipe); 00523 _dnspipe = -1; 00524 } 00525 if (!WIFEXITED(status)) 00526 { 00527 _state = WORKER_BROKEN; 00528 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE); 00529 _request->_activeworkers--; 00530 return; 00531 } 00532 int exitcode = WEXITSTATUS(status); 00533 DBG << "#" << _workerno << ": DNS lookup returned " << exitcode << endl; 00534 if (exitcode != 0) 00535 { 00536 _state = WORKER_BROKEN; 00537 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE); 00538 _request->_activeworkers--; 00539 return; 00540 } 00541 _request->_context->setDNSok(_url.getHost()); 00542 nextjob(); 00543 } 00544 00545 bool 00546 multifetchworker::checkChecksum() 00547 { 00548 // DBG << "checkChecksum block " << _blkno << endl; 00549 if (!_blksize || !_request->_blklist) 00550 return true; 00551 return _request->_blklist->verifyDigest(_blkno, _dig); 00552 } 00553 00554 bool 00555 multifetchworker::recheckChecksum() 00556 { 00557 // DBG << "recheckChecksum block " << _blkno << endl; 00558 if (!_request->_fp || !_blksize || !_request->_blklist) 00559 return true; 00560 if (fseeko(_request->_fp, _blkstart, SEEK_SET)) 00561 return false; 00562 char buf[4096]; 00563 size_t l = _blksize; 00564 _request->_blklist->createDigest(_dig); // resets digest 00565 while (l) 00566 { 00567 size_t cnt = l > sizeof(buf) ? sizeof(buf) : l; 00568 if (fread(buf, cnt, 1, _request->_fp) != 1) 00569 return false; 00570 _dig.update(buf, cnt); 00571 l -= cnt; 00572 } 00573 return _request->_blklist->verifyDigest(_blkno, _dig); 00574 } 00575 00576 00577 void 00578 multifetchworker::stealjob() 00579 { 00580 if (!_request->_stealing) 00581 { 00582 DBG << "start stealing!" << endl; 00583 _request->_stealing = true; 00584 } 00585 multifetchworker *best = 0; 00586 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin(); 00587 double now = 0; 00588 for (; workeriter != _request->_workers.end(); ++workeriter) 00589 { 00590 multifetchworker *worker = *workeriter; 00591 if (worker == this) 00592 continue; 00593 if (worker->_pass == -1) 00594 continue; // do not steal! 00595 if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_blksize) 00596 continue; // do not steal finished jobs 00597 if (!worker->_avgspeed && worker->_blkreceived) 00598 { 00599 if (!now) 00600 now = currentTime(); 00601 if (now > worker->_blkstarttime) 00602 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime); 00603 } 00604 if (!best || best->_pass > worker->_pass) 00605 { 00606 best = worker; 00607 continue; 00608 } 00609 if (best->_pass < worker->_pass) 00610 continue; 00611 // if it is the same block, we want to know the best worker, otherwise the worst 00612 if (worker->_blkstart == best->_blkstart) 00613 { 00614 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed < (best->_blksize - best->_blkreceived) * worker->_avgspeed) 00615 best = worker; 00616 } 00617 else 00618 { 00619 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed > (best->_blksize - best->_blkreceived) * worker->_avgspeed) 00620 best = worker; 00621 } 00622 } 00623 if (!best) 00624 { 00625 _state = WORKER_DONE; 00626 _request->_activeworkers--; 00627 _request->_finished = true; 00628 return; 00629 } 00630 // do not sleep twice 00631 if (_state != WORKER_SLEEP) 00632 { 00633 if (!_avgspeed && _blkreceived) 00634 { 00635 if (!now) 00636 now = currentTime(); 00637 if (now > _blkstarttime) 00638 _avgspeed = _blkreceived / (now - _blkstarttime); 00639 } 00640 00641 // lets see if we should sleep a bit 00642 DBG << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_blksize << endl; 00643 DBG << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_blksize - best->_blkreceived) << endl; 00644 if (_avgspeed && best->_avgspeed && best->_blksize - best->_blkreceived > 0 && 00645 (best->_blksize - best->_blkreceived) * _avgspeed < best->_blksize * best->_avgspeed) 00646 { 00647 if (!now) 00648 now = currentTime(); 00649 double sl = (best->_blksize - best->_blkreceived) / best->_avgspeed * 2; 00650 if (sl > 1) 00651 sl = 1; 00652 DBG << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl; 00653 _sleepuntil = now + sl; 00654 _state = WORKER_SLEEP; 00655 _request->_sleepworkers++; 00656 return; 00657 } 00658 } 00659 00660 _competing = true; 00661 best->_competing = true; 00662 _blkstart = best->_blkstart; 00663 _blksize = best->_blksize; 00664 best->_pass++; 00665 _pass = best->_pass; 00666 _blkno = best->_blkno; 00667 run(); 00668 } 00669 00670 void 00671 multifetchworker::disableCompetition() 00672 { 00673 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin(); 00674 for (; workeriter != _request->_workers.end(); ++workeriter) 00675 { 00676 multifetchworker *worker = *workeriter; 00677 if (worker == this) 00678 continue; 00679 if (worker->_blkstart == _blkstart) 00680 { 00681 if (worker->_state == WORKER_FETCH) 00682 worker->_state = WORKER_DISCARD; 00683 worker->_pass = -1; /* do not steal this one, we already have it */ 00684 } 00685 } 00686 } 00687 00688 00689 void 00690 multifetchworker::nextjob() 00691 { 00692 _noendrange = false; 00693 if (_request->_stealing) 00694 { 00695 stealjob(); 00696 return; 00697 } 00698 00699 MediaBlockList *blklist = _request->_blklist; 00700 if (!blklist) 00701 { 00702 _blksize = BLKSIZE; 00703 if (_request->_filesize != off_t(-1)) 00704 { 00705 if (_request->_blkoff >= _request->_filesize) 00706 { 00707 stealjob(); 00708 return; 00709 } 00710 _blksize = _request->_filesize - _request->_blkoff; 00711 if (_blksize > BLKSIZE) 00712 _blksize = BLKSIZE; 00713 } 00714 } 00715 else 00716 { 00717 MediaBlock blk = blklist->getBlock(_request->_blkno); 00718 while (_request->_blkoff >= blk.off + blk.size) 00719 { 00720 if (++_request->_blkno == blklist->numBlocks()) 00721 { 00722 stealjob(); 00723 return; 00724 } 00725 blk = blklist->getBlock(_request->_blkno); 00726 _request->_blkoff = blk.off; 00727 } 00728 _blksize = blk.off + blk.size - _request->_blkoff; 00729 if (_blksize > BLKSIZE && !blklist->haveChecksum(_request->_blkno)) 00730 _blksize = BLKSIZE; 00731 } 00732 _blkno = _request->_blkno; 00733 _blkstart = _request->_blkoff; 00734 _request->_blkoff += _blksize; 00735 run(); 00736 } 00737 00738 void 00739 multifetchworker::run() 00740 { 00741 char rangebuf[128]; 00742 00743 if (_state == WORKER_BROKEN || _state == WORKER_DONE) 00744 return; // just in case... 00745 if (_noendrange) 00746 sprintf(rangebuf, "%llu-", (unsigned long long)_blkstart); 00747 else 00748 sprintf(rangebuf, "%llu-%llu", (unsigned long long)_blkstart, (unsigned long long)_blkstart + _blksize - 1); 00749 DBG << "#" << _workerno << ": BLK " << _blkno << ":" << rangebuf << " " << _url << endl; 00750 if (curl_easy_setopt(_curl, CURLOPT_RANGE, !_noendrange || _blkstart != 0 ? rangebuf : (char *)0) != CURLE_OK) 00751 { 00752 _request->_activeworkers--; 00753 _state = WORKER_BROKEN; 00754 strncpy(_curlError, "curl_easy_setopt range failed", CURL_ERROR_SIZE); 00755 return; 00756 } 00757 if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK) 00758 { 00759 _request->_activeworkers--; 00760 _state = WORKER_BROKEN; 00761 strncpy(_curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE); 00762 return; 00763 } 00764 _request->_havenewjob = true; 00765 _off = _blkstart; 00766 _size = _blksize; 00767 if (_request->_blklist) 00768 _request->_blklist->createDigest(_dig); // resets digest 00769 _state = WORKER_FETCH; 00770 00771 double now = currentTime(); 00772 _blkstarttime = now; 00773 _blkreceived = 0; 00774 } 00775 00776 00778 00779 00780 multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) : _context(context), _filename(filename), _baseurl(baseurl) 00781 { 00782 _fp = fp; 00783 _report = report; 00784 _blklist = blklist; 00785 _filesize = filesize; 00786 _multi = multi; 00787 _stealing = false; 00788 _havenewjob = false; 00789 _blkno = 0; 00790 if (_blklist) 00791 _blkoff = _blklist->getBlock(0).off; 00792 else 00793 _blkoff = 0; 00794 _activeworkers = 0; 00795 _lookupworkers = 0; 00796 _sleepworkers = 0; 00797 _minsleepuntil = 0; 00798 _finished = false; 00799 _fetchedsize = 0; 00800 _fetchedgoodsize = 0; 00801 _totalsize = 0; 00802 _lastperiodstart = _lastprogress = _starttime = currentTime(); 00803 _lastperiodfetched = 0; 00804 _periodavg = 0; 00805 _timeout = 0; 00806 _connect_timeout = 0; 00807 _maxspeed = 0; 00808 _maxworkers = 0; 00809 if (blklist) 00810 { 00811 for (size_t blkno = 0; blkno < blklist->numBlocks(); blkno++) 00812 { 00813 MediaBlock blk = blklist->getBlock(blkno); 00814 _totalsize += blk.size; 00815 } 00816 } 00817 else if (filesize != off_t(-1)) 00818 _totalsize = filesize; 00819 } 00820 00821 multifetchrequest::~multifetchrequest() 00822 { 00823 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00824 { 00825 multifetchworker *worker = *workeriter; 00826 *workeriter = NULL; 00827 delete worker; 00828 } 00829 _workers.clear(); 00830 } 00831 00832 void 00833 multifetchrequest::run(std::vector<Url> &urllist) 00834 { 00835 int workerno = 0; 00836 std::vector<Url>::iterator urliter = urllist.begin(); 00837 for (;;) 00838 { 00839 fd_set rset, wset, xset; 00840 int maxfd, nqueue; 00841 00842 if (_finished) 00843 { 00844 DBG << "finished!" << endl; 00845 break; 00846 } 00847 00848 if (_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS) 00849 { 00850 // spawn another worker! 00851 multifetchworker *worker = new multifetchworker(workerno++, *this, *urliter); 00852 _workers.push_back(worker); 00853 if (worker->_state != WORKER_BROKEN) 00854 { 00855 _activeworkers++; 00856 if (worker->_state != WORKER_LOOKUP) 00857 { 00858 worker->nextjob(); 00859 } 00860 else 00861 _lookupworkers++; 00862 } 00863 ++urliter; 00864 continue; 00865 } 00866 if (!_activeworkers) 00867 { 00868 WAR << "No more active workers!" << endl; 00869 // show the first worker error we find 00870 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00871 { 00872 if ((*workeriter)->_state != WORKER_BROKEN) 00873 continue; 00874 ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError)); 00875 } 00876 break; 00877 } 00878 00879 FD_ZERO(&rset); 00880 FD_ZERO(&wset); 00881 FD_ZERO(&xset); 00882 00883 curl_multi_fdset(_multi, &rset, &wset, &xset, &maxfd); 00884 00885 if (_lookupworkers) 00886 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00887 (*workeriter)->adddnsfd(rset, maxfd); 00888 00889 timeval tv; 00890 // if we added a new job we have to call multi_perform once 00891 // to make it show up in the fd set. do not sleep in this case. 00892 tv.tv_sec = 0; 00893 tv.tv_usec = _havenewjob ? 0 : 200000; 00894 if (_sleepworkers && !_havenewjob) 00895 { 00896 if (_minsleepuntil == 0) 00897 { 00898 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00899 { 00900 multifetchworker *worker = *workeriter; 00901 if (worker->_state != WORKER_SLEEP) 00902 continue; 00903 if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil) 00904 _minsleepuntil = worker->_sleepuntil; 00905 } 00906 } 00907 double sl = _minsleepuntil - currentTime(); 00908 if (sl < 0) 00909 { 00910 sl = 0; 00911 _minsleepuntil = 0; 00912 } 00913 if (sl < .2) 00914 tv.tv_usec = sl * 1000000; 00915 } 00916 int r = select(maxfd + 1, &rset, &wset, &xset, &tv); 00917 if (r == -1 && errno != EINTR) 00918 ZYPP_THROW(MediaCurlException(_baseurl, "select() failed", "unknown error")); 00919 if (r != 0 && _lookupworkers) 00920 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00921 { 00922 multifetchworker *worker = *workeriter; 00923 if (worker->_state != WORKER_LOOKUP) 00924 continue; 00925 (*workeriter)->dnsevent(rset); 00926 if (worker->_state != WORKER_LOOKUP) 00927 _lookupworkers--; 00928 } 00929 _havenewjob = false; 00930 00931 // run curl 00932 for (;;) 00933 { 00934 CURLMcode mcode; 00935 int tasks; 00936 mcode = curl_multi_perform(_multi, &tasks); 00937 if (mcode == CURLM_CALL_MULTI_PERFORM) 00938 continue; 00939 if (mcode != CURLM_OK) 00940 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_perform", "unknown error")); 00941 break; 00942 } 00943 00944 double now = currentTime(); 00945 00946 // update periodavg 00947 if (now > _lastperiodstart + .5) 00948 { 00949 if (!_periodavg) 00950 _periodavg = (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart); 00951 else 00952 _periodavg = (_periodavg + (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart)) / 2; 00953 _lastperiodfetched = _fetchedsize; 00954 _lastperiodstart = now; 00955 } 00956 00957 // wake up sleepers 00958 if (_sleepworkers) 00959 { 00960 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 00961 { 00962 multifetchworker *worker = *workeriter; 00963 if (worker->_state != WORKER_SLEEP) 00964 continue; 00965 if (worker->_sleepuntil > now) 00966 continue; 00967 if (_minsleepuntil == worker->_sleepuntil) 00968 _minsleepuntil = 0; 00969 DBG << "#" << worker->_workerno << ": sleep done, wake up" << endl; 00970 _sleepworkers--; 00971 // nextjob chnages the state 00972 worker->nextjob(); 00973 } 00974 } 00975 00976 // collect all curl results, reschedule new jobs 00977 CURLMsg *msg; 00978 while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0) 00979 { 00980 if (msg->msg != CURLMSG_DONE) 00981 continue; 00982 CURL *easy = msg->easy_handle; 00983 CURLcode cc = msg->data.result; 00984 multifetchworker *worker; 00985 if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK) 00986 ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error")); 00987 if (worker->_blkreceived && now > worker->_blkstarttime) 00988 { 00989 if (worker->_avgspeed) 00990 worker->_avgspeed = (worker->_avgspeed + worker->_blkreceived / (now - worker->_blkstarttime)) / 2; 00991 else 00992 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime); 00993 } 00994 DBG << "#" << worker->_workerno << ": BLK " << worker->_blkno << " done code " << cc << " speed " << worker->_avgspeed << endl; 00995 curl_multi_remove_handle(_multi, easy); 00996 if (cc == CURLE_HTTP_RETURNED_ERROR) 00997 { 00998 long statuscode = 0; 00999 (void)curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &statuscode); 01000 DBG << "HTTP status " << statuscode << endl; 01001 if (statuscode == 416 && !_blklist) /* Range error */ 01002 { 01003 if (_filesize == off_t(-1)) 01004 { 01005 if (!worker->_noendrange) 01006 { 01007 DBG << "#" << worker->_workerno << ": retrying with no end range" << endl; 01008 worker->_noendrange = true; 01009 worker->run(); 01010 continue; 01011 } 01012 worker->_noendrange = false; 01013 worker->stealjob(); 01014 continue; 01015 } 01016 if (worker->_blkstart >= _filesize) 01017 { 01018 worker->nextjob(); 01019 continue; 01020 } 01021 } 01022 } 01023 if (cc == 0) 01024 { 01025 if (!worker->checkChecksum()) 01026 { 01027 WAR << "#" << worker->_workerno << ": checksum error, disable worker" << endl; 01028 worker->_state = WORKER_BROKEN; 01029 strncpy(worker->_curlError, "checksum error", CURL_ERROR_SIZE); 01030 _activeworkers--; 01031 continue; 01032 } 01033 if (worker->_state == WORKER_FETCH) 01034 { 01035 if (worker->_competing) 01036 { 01037 worker->disableCompetition(); 01038 // multiple workers wrote into this block. We already know that our 01039 // data was correct, but maybe some other worker overwrote our data 01040 // with something broken. Thus we have to re-check the block. 01041 if (!worker->recheckChecksum()) 01042 { 01043 DBG << "#" << worker->_workerno << ": recheck checksum error, refetch block" << endl; 01044 // re-fetch! No need to worry about the bad workers, 01045 // they will now be set to DISCARD. At the end of their block 01046 // they will notice that they wrote bad data and go into BROKEN. 01047 worker->run(); 01048 continue; 01049 } 01050 } 01051 _fetchedgoodsize += worker->_blksize; 01052 } 01053 01054 // make bad workers sleep a little 01055 double maxavg = 0; 01056 int maxworkerno = 0; 01057 int numbetter = 0; 01058 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 01059 { 01060 multifetchworker *oworker = *workeriter; 01061 if (oworker->_state == WORKER_BROKEN) 01062 continue; 01063 if (oworker->_avgspeed > maxavg) 01064 { 01065 maxavg = oworker->_avgspeed; 01066 maxworkerno = oworker->_workerno; 01067 } 01068 if (oworker->_avgspeed > worker->_avgspeed) 01069 numbetter++; 01070 } 01071 if (maxavg && !_stealing) 01072 { 01073 double ratio = worker->_avgspeed / maxavg; 01074 ratio = 1 - ratio; 01075 if (numbetter < 3) // don't sleep that much if we're in the top two 01076 ratio = ratio * ratio; 01077 if (ratio > .01) 01078 { 01079 DBG << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl; 01080 worker->_sleepuntil = now + ratio; 01081 worker->_state = WORKER_SLEEP; 01082 _sleepworkers++; 01083 continue; 01084 } 01085 } 01086 01087 // do rate control (if requested) 01088 // should use periodavg, but that's not what libcurl does 01089 if (_maxspeed && now > _starttime) 01090 { 01091 double avg = _fetchedsize / (now - _starttime); 01092 avg = worker->_maxspeed * _maxspeed / avg; 01093 if (avg < _maxspeed / _maxworkers) 01094 avg = _maxspeed / _maxworkers; 01095 if (avg > _maxspeed) 01096 avg = _maxspeed; 01097 if (avg < 1024) 01098 avg = 1024; 01099 worker->_maxspeed = avg; 01100 curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg)); 01101 } 01102 01103 worker->nextjob(); 01104 } 01105 else 01106 { 01107 worker->_state = WORKER_BROKEN; 01108 _activeworkers--; 01109 if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS)) 01110 { 01111 // end of workers reached! goodbye! 01112 worker->evaluateCurlCode(Pathname(), cc, false); 01113 } 01114 } 01115 } 01116 01117 // send report 01118 if (_report) 01119 { 01120 int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0; 01121 double avg = 0; 01122 if (now > _starttime) 01123 avg = _fetchedsize / (now - _starttime); 01124 if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg)) 01125 ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled")); 01126 } 01127 01128 if (_timeout && now - _lastprogress > _timeout) 01129 break; 01130 } 01131 01132 if (!_finished) 01133 ZYPP_THROW(MediaTimeoutException(_baseurl)); 01134 01135 // print some download stats 01136 WAR << "overall result" << endl; 01137 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) 01138 { 01139 multifetchworker *worker = *workeriter; 01140 WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl; 01141 } 01142 } 01143 01144 01146 01147 01148 MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r) 01149 : MediaCurl(url_r, attach_point_hint_r) 01150 { 01151 MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl; 01152 _multi = 0; 01153 _customHeadersMetalink = 0; 01154 } 01155 01156 MediaMultiCurl::~MediaMultiCurl() 01157 { 01158 if (_customHeadersMetalink) 01159 { 01160 curl_slist_free_all(_customHeadersMetalink); 01161 _customHeadersMetalink = 0; 01162 } 01163 if (_multi) 01164 { 01165 curl_multi_cleanup(_multi); 01166 _multi = 0; 01167 } 01168 std::map<std::string, CURL *>::iterator it; 01169 for (it = _easypool.begin(); it != _easypool.end(); it++) 01170 { 01171 CURL *easy = it->second; 01172 if (easy) 01173 { 01174 curl_easy_cleanup(easy); 01175 it->second = NULL; 01176 } 01177 } 01178 } 01179 01180 void MediaMultiCurl::setupEasy() 01181 { 01182 MediaCurl::setupEasy(); 01183 01184 if (_customHeadersMetalink) 01185 { 01186 curl_slist_free_all(_customHeadersMetalink); 01187 _customHeadersMetalink = 0; 01188 } 01189 struct curl_slist *sl = _customHeaders; 01190 for (; sl; sl = sl->next) 01191 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data); 01192 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/metalink+xml, application/metalink4+xml"); 01193 } 01194 01195 static bool looks_like_metalink(const Pathname & file) 01196 { 01197 char buf[256], *p; 01198 int fd, l; 01199 if ((fd = open(file.asString().c_str(), O_RDONLY)) == -1) 01200 return false; 01201 while ((l = read(fd, buf, sizeof(buf) - 1)) == -1 && errno == EINTR) 01202 ; 01203 close(fd); 01204 if (l == -1) 01205 return 0; 01206 buf[l] = 0; 01207 p = buf; 01208 while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n') 01209 p++; 01210 if (!strncasecmp(p, "<?xml", 5)) 01211 { 01212 while (*p && *p != '>') 01213 p++; 01214 if (*p == '>') 01215 p++; 01216 while (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n') 01217 p++; 01218 } 01219 bool ret = !strncasecmp(p, "<metalink", 9) ? true : false; 01220 DBG << "looks_like_metalink(" << file << "): " << ret << endl; 01221 return ret; 01222 } 01223 01224 void MediaMultiCurl::doGetFileCopy( const Pathname & filename , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const 01225 { 01226 Pathname dest = target.absolutename(); 01227 if( assert_dir( dest.dirname() ) ) 01228 { 01229 DBG << "assert_dir " << dest.dirname() << " failed" << endl; 01230 Url url(getFileUrl(filename)); 01231 ZYPP_THROW( MediaSystemException(url, "System error on " + dest.dirname().asString()) ); 01232 } 01233 string destNew = target.asString() + ".new.zypp.XXXXXX"; 01234 char *buf = ::strdup( destNew.c_str()); 01235 if( !buf) 01236 { 01237 ERR << "out of memory for temp file name" << endl; 01238 Url url(getFileUrl(filename)); 01239 ZYPP_THROW(MediaSystemException(url, "out of memory for temp file name")); 01240 } 01241 01242 int tmp_fd = ::mkstemp( buf ); 01243 if( tmp_fd == -1) 01244 { 01245 free( buf); 01246 ERR << "mkstemp failed for file '" << destNew << "'" << endl; 01247 ZYPP_THROW(MediaWriteException(destNew)); 01248 } 01249 destNew = buf; 01250 free( buf); 01251 01252 FILE *file = ::fdopen( tmp_fd, "w" ); 01253 if ( !file ) { 01254 ::close( tmp_fd); 01255 filesystem::unlink( destNew ); 01256 ERR << "fopen failed for file '" << destNew << "'" << endl; 01257 ZYPP_THROW(MediaWriteException(destNew)); 01258 } 01259 DBG << "dest: " << dest << endl; 01260 DBG << "temp: " << destNew << endl; 01261 01262 // set IFMODSINCE time condition (no download if not modified) 01263 if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) ) 01264 { 01265 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE); 01266 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime()); 01267 } 01268 else 01269 { 01270 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE); 01271 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L); 01272 } 01273 // change header to include Accept: metalink 01274 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink); 01275 try 01276 { 01277 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options); 01278 } 01279 catch (Exception &ex) 01280 { 01281 ::fclose(file); 01282 filesystem::unlink(destNew); 01283 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE); 01284 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L); 01285 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders); 01286 ZYPP_RETHROW(ex); 01287 } 01288 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE); 01289 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L); 01290 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders); 01291 long httpReturnCode = 0; 01292 CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode); 01293 if (infoRet == CURLE_OK) 01294 { 01295 DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl; 01296 if ( httpReturnCode == 304 01297 || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified 01298 { 01299 DBG << "not modified: " << PathInfo(dest) << endl; 01300 return; 01301 } 01302 } 01303 else 01304 { 01305 WAR << "Could not get the reponse code." << endl; 01306 } 01307 01308 bool ismetalink = false; 01309 01310 char *ptr = NULL; 01311 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr) 01312 { 01313 string ct = string(ptr); 01314 if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0) 01315 ismetalink = true; 01316 } 01317 01318 if (!ismetalink) 01319 { 01320 // some proxies do not store the content type, so also look at the file to find 01321 // out if we received a metalink (bnc#649925) 01322 fflush(file); 01323 if (looks_like_metalink(Pathname(destNew))) 01324 ismetalink = true; 01325 } 01326 01327 if (ismetalink) 01328 { 01329 bool userabort = false; 01330 fclose(file); 01331 file = NULL; 01332 Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed"; 01333 try 01334 { 01335 MetaLinkParser mlp; 01336 mlp.parse(Pathname(destNew)); 01337 MediaBlockList bl = mlp.getBlockList(); 01338 vector<Url> urls = mlp.getUrls(); 01339 DBG << bl << endl; 01340 file = fopen(destNew.c_str(), "w+"); 01341 if (!file) 01342 ZYPP_THROW(MediaWriteException(destNew)); 01343 if (PathInfo(target).isExist()) 01344 { 01345 DBG << "reusing blocks from file " << target << endl; 01346 bl.reuseBlocks(file, target.asString()); 01347 DBG << bl << endl; 01348 } 01349 if (bl.haveChecksum(1) && PathInfo(failedFile).isExist()) 01350 { 01351 DBG << "reusing blocks from file " << failedFile << endl; 01352 bl.reuseBlocks(file, failedFile.asString()); 01353 DBG << bl << endl; 01354 filesystem::unlink(failedFile); 01355 } 01356 Pathname df = deltafile(); 01357 if (!df.empty()) 01358 { 01359 DBG << "reusing blocks from file " << df << endl; 01360 bl.reuseBlocks(file, df.asString()); 01361 DBG << bl << endl; 01362 } 01363 try 01364 { 01365 multifetch(filename, file, &urls, &report, &bl); 01366 } 01367 catch (MediaCurlException &ex) 01368 { 01369 userabort = ex.errstr() == "User abort"; 01370 ZYPP_RETHROW(ex); 01371 } 01372 } 01373 catch (Exception &ex) 01374 { 01375 // something went wrong. fall back to normal download 01376 if (file) 01377 fclose(file); 01378 file = NULL; 01379 if (PathInfo(destNew).size() >= 63336) 01380 { 01381 ::unlink(failedFile.asString().c_str()); 01382 filesystem::hardlinkCopy(destNew, failedFile); 01383 } 01384 if (userabort) 01385 { 01386 filesystem::unlink(destNew); 01387 ZYPP_RETHROW(ex); 01388 } 01389 file = fopen(destNew.c_str(), "w+"); 01390 if (!file) 01391 ZYPP_THROW(MediaWriteException(destNew)); 01392 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options | OPTION_NO_REPORT_START); 01393 } 01394 } 01395 01396 if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 ))) 01397 { 01398 ERR << "Failed to chmod file " << destNew << endl; 01399 } 01400 if (::fclose(file)) 01401 { 01402 filesystem::unlink(destNew); 01403 ERR << "Fclose failed for file '" << destNew << "'" << endl; 01404 ZYPP_THROW(MediaWriteException(destNew)); 01405 } 01406 if ( rename( destNew, dest ) != 0 ) 01407 { 01408 ERR << "Rename failed" << endl; 01409 ZYPP_THROW(MediaWriteException(dest)); 01410 } 01411 DBG << "done: " << PathInfo(dest) << endl; 01412 } 01413 01414 void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) const 01415 { 01416 Url baseurl(getFileUrl(filename)); 01417 if (blklist && filesize == off_t(-1) && blklist->haveFilesize()) 01418 filesize = blklist->getFilesize(); 01419 if (blklist && !blklist->haveBlocks() && filesize != 0) 01420 blklist = 0; 01421 if (blklist && (filesize == 0 || !blklist->numBlocks())) 01422 { 01423 checkFileDigest(baseurl, fp, blklist); 01424 return; 01425 } 01426 if (filesize == 0) 01427 return; 01428 if (!_multi) 01429 { 01430 _multi = curl_multi_init(); 01431 if (!_multi) 01432 ZYPP_THROW(MediaCurlInitException(baseurl)); 01433 } 01434 multifetchrequest req(this, filename, baseurl, _multi, fp, report, blklist, filesize); 01435 req._timeout = _settings.timeout(); 01436 req._connect_timeout = _settings.connectTimeout(); 01437 req._maxspeed = _settings.maxDownloadSpeed(); 01438 req._maxworkers = _settings.maxConcurrentConnections(); 01439 if (req._maxworkers > MAXURLS) 01440 req._maxworkers = MAXURLS; 01441 if (req._maxworkers <= 0) 01442 req._maxworkers = 1; 01443 std::vector<Url> myurllist; 01444 for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter) 01445 { 01446 try 01447 { 01448 string scheme = urliter->getScheme(); 01449 if (scheme == "http" || scheme == "https" || scheme == "ftp") 01450 { 01451 checkProtocol(*urliter); 01452 myurllist.push_back(*urliter); 01453 } 01454 } 01455 catch (...) 01456 { 01457 } 01458 } 01459 if (!myurllist.size()) 01460 myurllist.push_back(baseurl); 01461 req.run(myurllist); 01462 checkFileDigest(baseurl, fp, blklist); 01463 } 01464 01465 void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList *blklist) const 01466 { 01467 if (!blklist || !blklist->haveFileChecksum()) 01468 return; 01469 if (fseeko(fp, off_t(0), SEEK_SET)) 01470 ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error")); 01471 Digest dig; 01472 blklist->createFileDigest(dig); 01473 char buf[4096]; 01474 size_t l; 01475 while ((l = fread(buf, 1, sizeof(buf), fp)) > 0) 01476 dig.update(buf, l); 01477 if (!blklist->verifyFileDigest(dig)) 01478 ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error")); 01479 } 01480 01481 bool MediaMultiCurl::isDNSok(const string &host) const 01482 { 01483 return _dnsok.find(host) == _dnsok.end() ? false : true; 01484 } 01485 01486 void MediaMultiCurl::setDNSok(const string &host) const 01487 { 01488 _dnsok.insert(host); 01489 } 01490 01491 CURL *MediaMultiCurl::fromEasyPool(const string &host) const 01492 { 01493 if (_easypool.find(host) == _easypool.end()) 01494 return 0; 01495 CURL *ret = _easypool[host]; 01496 _easypool.erase(host); 01497 return ret; 01498 } 01499 01500 void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const 01501 { 01502 CURL *oldeasy = _easypool[host]; 01503 _easypool[host] = easy; 01504 if (oldeasy) 01505 curl_easy_cleanup(oldeasy); 01506 } 01507 01508 } // namespace media 01509 } // namespace zypp 01510