XRootD
Loading...
Searching...
No Matches
XrdTpcTPC.cc
Go to the documentation of this file.
4#include "XrdOuc/XrdOucEnv.hh"
8#include "XrdSys/XrdSysFD.hh"
9#include "XrdVersion.hh"
10
13#include "XrdTpc/XrdTpcUtils.hh"
14
15#include <curl/curl.h>
16
17#include <dlfcn.h>
18#include <fcntl.h>
19
20#include <algorithm>
21#include <memory>
22#include <sstream>
23#include <stdexcept>
24#include <thread>
25#include <iostream> // Delete later!!!
26
27#include "XrdTpcState.hh"
28#include "XrdTpcStream.hh"
29#include "XrdTpcTPC.hh"
30#include "XrdTpcCurlMulti.hh"
31#include <fstream>
32
33using namespace TPC;
34
35XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
36
37uint64_t TPCHandler::m_monid{0};
38int TPCHandler::m_marker_period = 5;
39size_t TPCHandler::m_block_size = 16*1024*1024;
40size_t TPCHandler::m_small_block_size = 1*1024*1024;
41XrdSysMutex TPCHandler::m_monid_mutex;
42
44
45/******************************************************************************/
46/* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
47/******************************************************************************/
48
49TPCHandler::TPCLogRecord::~TPCLogRecord()
50{
51// Record monitoring data is enabled
52//
53 if (tpcMonitor)
55
56 monInfo.clID = clID.c_str();
57 monInfo.begT = begT;
58 gettimeofday(&monInfo.endT, 0);
59
60 if (log_prefix == "PullRequest")
61 {monInfo.dstURL = local.c_str();
62 monInfo.srcURL = remote.c_str();
63 } else {
64 monInfo.dstURL = remote.c_str();
65 monInfo.srcURL = local.c_str();
67 }
68
69 if (!status) monInfo.endRC = 0;
70 else if (tpc_status > 0) monInfo.endRC = tpc_status;
71 else monInfo.endRC = 1;
72 monInfo.strm = static_cast<unsigned char>(streams);
73 monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
74 if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
75
76 tpcMonitor->Report(monInfo);
77 }
78}
79
80/******************************************************************************/
81/* C u r l D e l e t e r : : o p e r a t o r ( ) */
82/******************************************************************************/
83
85{
86 if (curl) curl_easy_cleanup(curl);
87}
88
89/******************************************************************************/
90/* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
91/******************************************************************************/
92
101int TPCHandler::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
102 TPCLogRecord * rec = (TPCLogRecord *)clientp;
103 if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
104 // We will not reach this callback if the corresponding socket could not have been connected
105 // the socket is already connected only if the packet marking is enabled
106 return CURL_SOCKOPT_ALREADY_CONNECTED;
107 }
108 return CURL_SOCKOPT_OK;
109}
110
111/******************************************************************************/
112/* o p e n s o c k e t _ c a l l b a c k */
113/******************************************************************************/
114
115
120int TPCHandler::opensocket_callback(void *clientp,
121 curlsocktype purpose,
122 struct curl_sockaddr *aInfo)
123{
124 //Return a socket file descriptor (note the clo_exec flag will be set).
125 int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
126 // See what kind of address will be used to connect
127 //
128 if(fd < 0) {
129 return CURL_SOCKET_BAD;
130 }
131 TPCLogRecord * rec = (TPCLogRecord *)clientp;
132 if (purpose == CURLSOCKTYPE_IPCXN && clientp)
133 {XrdNetAddr thePeer(&(aInfo->addr));
134 rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6)
135 && !thePeer.isMapped());
136 std::stringstream connectErrMsg;
137
138 if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
139 rec->m_log->Emsg(rec->log_prefix.c_str(),"Unable to connect socket:", connectErrMsg.str().c_str());
140 return CURL_SOCKET_BAD;
141 }
142 }
143
144 return fd;
145}
146
147int TPCHandler::closesocket_callback(void *clientp, curl_socket_t fd) {
148 TPCLogRecord * rec = (TPCLogRecord *)clientp;
149
150 // Destroy the PMark handle associated to the file descriptor before closing it.
151 // Otherwise, we would lose the socket usage information if the socket is closed before
152 // the PMark handle is closed.
153 rec->pmarkManager.endPmark(fd);
154
155 return close(fd);
156}
157
158/******************************************************************************/
159/* p r e p a r e U R L */
160/******************************************************************************/
161
162// We need to utilize the full URL (including the query string), not just the
163// resource name. The query portion is hidden in the `xrd-http-query` header;
164// we take this out and combine it with the resource name.
165// We also append the value of the headers configured in tpc.header2cgi to the resource full URL
166//
167// One special key is `authz`; this is always stripped out and copied to the Authorization
168// header (which will later be used for XrdSecEntity). The latter copy is only done if
169// the Authorization header is not already present.
170//
171// hasSetOpaque will be set to true if at least one opaque data has been set in the URL that is returned,
172// false otherwise
173std::string TPCHandler::prepareURL(XrdHttpExtReq &req, bool & hasSetOpaque) {
174 return XrdTpcUtils::prepareOpenURL(req.resource, req.headers,hdr2cgimap,hasSetOpaque);
175}
176
177std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
178 bool foundHeader;
179 return prepareURL(req,foundHeader);
180}
181
182/******************************************************************************/
183/* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
184/******************************************************************************/
185
186// When processing a redirection from the filesystem layer, it is permitted to return
187// some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
188// more permissive than a URI (basically, only '&' and '=' are disallowed while some
189// URI parsers may dislike characters like '"'). This function takes an opaque string
190// (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
191std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
192{
193 std::stringstream parser(opaque);
194 std::string sequence;
195 std::stringstream output;
196 bool first = true;
197 while (getline(parser, sequence, '&')) {
198 if (sequence.empty()) {continue;}
199 size_t equal_pos = sequence.find('=');
200 char *val = NULL;
201 if (equal_pos != std::string::npos)
202 val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
203 // Do not emit parameter if value exists and escaping failed.
204 if (!val && equal_pos != std::string::npos) {continue;}
205
206 if (!first) output << "&";
207 first = false;
208 output << sequence.substr(0, equal_pos);
209 if (val) {
210 output << "=" << val;
211 curl_free(val);
212 }
213 }
214 return output.str();
215}
216
217/******************************************************************************/
218/* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
219/******************************************************************************/
220
221void
222TPCHandler::ConfigureCurlCA(CURL *curl)
223{
224 auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
225 auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
226 if (!ca_filename.empty() && !crl_filename.empty()) {
227 curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
228 //Check that the CRL file contains at least one entry before setting this option to curl
229 //Indeed, an empty CRL file will make curl unhappy and therefore will fail
230 //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
231 std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
232 if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
233 curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
234 } else {
235 std::ostringstream oss;
236 oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
237 m_log.Log(Warning,"TpcHandler",oss.str().c_str());
238 }
239 }
240 else if (!m_cadir.empty()) {
241 curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
242 }
243 if (!m_cafile.empty()) {
244 curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
245 }
246}
247
248
249bool TPCHandler::MatchesPath(const char *verb, const char *path) {
250 return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
251}
252
253/******************************************************************************/
254/* P r e p a r e U R L */
255/******************************************************************************/
256
257static std::string PrepareURL(const std::string &input) {
258 if (!strncmp(input.c_str(), "davs://", 7)) {
259 return "https://" + input.substr(7);
260 }
261 return input;
262}
263
264/******************************************************************************/
265/* T P C H a n d l e r : : P r o c e s s R e q */
266/******************************************************************************/
267
269 if (req.verb == "OPTIONS") {
270 return ProcessOptionsReq(req);
271 }
272 auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
273 if (header != req.headers.end()) {
274 if (header->second != "none") {
275 m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
276 return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
277 }
278 }
279 header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
280 if (header != req.headers.end()) {
281 std::string src = PrepareURL(header->second);
282 return ProcessPullReq(src, req);
283 }
284 header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
285 if (header != req.headers.end()) {
286 return ProcessPushReq(header->second, req);
287 }
288 m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
289 return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
290}
291
292/******************************************************************************/
293/* T P C H a n d l e r D e s t r u c t o r */
294/******************************************************************************/
295
297 m_sfs = NULL;
298}
299
300/******************************************************************************/
301/* T P C H a n d l e r C o n s t r u c t o r */
302/******************************************************************************/
303
304TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
305 m_desthttps(false),
306 m_fixed_route(false),
307 m_timeout(60),
308 m_first_timeout(120),
309 m_log(log->logger(), "TPC_"),
310 m_sfs(NULL)
311{
312 if (!Configure(config, myEnv)) {
313 throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
314 }
315
316// Extract out the TPC monitoring object (we share it with xrootd).
317//
318 XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
319 if (gs)
320 TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
321}
322
323/******************************************************************************/
324/* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
325/******************************************************************************/
326
330int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
331 return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
332}
333
334/******************************************************************************/
335/* T P C H a n d l e r : : G e t A u t h z */
336/******************************************************************************/
337
338std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
339 std::string authz;
340 auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
341 if (authz_header != req.headers.end()) {
342 char * quoted_url = quote(authz_header->second.c_str());
343 std::stringstream ss;
344 ss << "authz=" << quoted_url;
345 free(quoted_url);
346 authz = ss.str();
347 }
348 return authz;
349}
350
351/******************************************************************************/
352/* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
353/******************************************************************************/
354
355int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
356 XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
357{
358 int port;
359 const char *ptr = error.getErrText(port);
360 if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
361 rec.status = 500;
362 std::stringstream ss;
363 ss << "Internal error: redirect without hostname";
364 logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
365 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
366 }
367
368 // Construct redirection URL taking into consideration any opaque info
369 std::string rdr_info = ptr;
370 std::string host, opaque;
371 size_t pos = rdr_info.find('?');
372 host = rdr_info.substr(0, pos);
373
374 if (pos != std::string::npos) {
375 opaque = rdr_info.substr(pos + 1);
376 }
377
378 std::stringstream ss;
379 ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
380
381 if (!opaque.empty()) {
382 ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
383 }
384
385 rec.status = 307;
386 logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
387 return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
388 NULL, 0);
389}
390
391/******************************************************************************/
392/* T P C H a n d l e r : : O p e n W a i t S t a l l */
393/******************************************************************************/
394
395int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
396 int mode, int openMode, const XrdSecEntity &sec,
397 const std::string &authz)
398{
399 int open_result;
400 while (1) {
401 int orig_ucap = fh.error.getUCap();
402 fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
403 std::string opaque;
404 size_t pos = resource.find('?');
405 // Extract the path and opaque info from the resource
406 std::string path = resource.substr(0, pos);
407
408 if (pos != std::string::npos) {
409 opaque = resource.substr(pos + 1);
410 }
411
412 // Append the authz information if there are some
413 if(!authz.empty()) {
414 opaque += (opaque.empty() ? "" : "&");
415 opaque += authz;
416 }
417 open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
418
419 if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
420 int secs_to_stall = fh.error.getErrInfo();
421 if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
422 std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
423 }
424 break;
425 }
426 return open_result;
427}
428
429/******************************************************************************/
430/* XRD_CHUNK_RESP: */
431/* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
432/******************************************************************************/
433
434#ifdef XRD_CHUNK_RESP
435
436
437
441int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
442 bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
443 success = false;
444 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
445 CURLcode res;
446 res = curl_easy_perform(curl);
447 //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
448 //don't want the next curl call to do be a HEAD request
449 curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
450 if (res == CURLE_HTTP_RETURNED_ERROR) {
451 std::stringstream ss;
452 ss << "Remote server failed request";
453 std::stringstream ss2;
454 ss2 << ss.str() << ": " << curl_easy_strerror(res);
455 rec.status = 500;
456 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
457 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
458 } else if (state.GetStatusCode() >= 400) {
459 std::stringstream ss;
460 ss << "Remote side " << req.clienthost << " failed with status code " << state.GetStatusCode();
461 rec.status = 500;
462 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
463 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0) : -1;
464 } else if (res) {
465 std::stringstream ss;
466 ss << "Internal transfer failure";
467 std::stringstream ss2;
468 ss2 << ss.str() << " - HTTP library failed: " << curl_easy_strerror(res);
469 rec.status = 500;
470 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
471 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
472 }
473 std::stringstream ss;
474 ss << "Successfully determined remote size for pull request: "
475 << state.GetContentLength();
476 logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str());
477 success = true;
478 return 0;
479}
480
481int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
482 State state(curl,req.tpcForwardCreds);
483 //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
484 //it will fail
485 state.CopyHeaders(req);
486 int result;
487 //In case we cannot get the content length, we don't return anything to the client
488 if ((result = DetermineXferSize(curl, req, state, success, rec, false)) || !success) {
489 return result;
490 }
491 contentLength = state.GetContentLength();
492 return result;
493}
494
495/******************************************************************************/
496/* XRD_CHUNK_RESP: */
497/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
498/******************************************************************************/
499
500int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
501 std::stringstream ss;
502 const std::string crlf = "\n";
503 ss << "Perf Marker" << crlf;
504 ss << "Timestamp: " << time(NULL) << crlf;
505 ss << "Stripe Index: 0" << crlf;
506 ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
507 ss << "Total Stripe Count: 1" << crlf;
508 // Include the TCP connection associated with this transfer; used by
509 // the TPC client for monitoring purposes.
510 std::string desc = state.GetConnectionDescription();
511 if (!desc.empty())
512 ss << "RemoteConnections: " << desc << crlf;
513 ss << "End" << crlf;
514 rec.bytes_transferred = state.BytesTransferred();
515 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
516
517 return req.ChunkResp(ss.str().c_str(), 0);
518}
519
520/******************************************************************************/
521/* XRD_CHUNK_RESP: */
522/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
523/******************************************************************************/
524
525int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
526 off_t bytes_transferred)
527{
528 // The 'performance marker' format is largely derived from how GridFTP works
529 // (e.g., the concept of `Stripe` is not quite so relevant here). See:
530 // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
531 // Example marker:
532 // Perf Marker\n
533 // Timestamp: 1537788010\n
534 // Stripe Index: 0\n
535 // Stripe Bytes Transferred: 238745\n
536 // Total Stripe Count: 1\n
537 // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
538 // End\n
539 //
540 std::stringstream ss;
541 const std::string crlf = "\n";
542 ss << "Perf Marker" << crlf;
543 ss << "Timestamp: " << time(NULL) << crlf;
544 ss << "Stripe Index: 0" << crlf;
545 ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
546 ss << "Total Stripe Count: 1" << crlf;
547 // Build a list of TCP connections associated with this transfer; used by
548 // the TPC client for monitoring purposes.
549 bool first = true;
550 std::stringstream ss2;
551 for (std::vector<State*>::const_iterator iter = state.begin();
552 iter != state.end(); iter++)
553 {
554 std::string desc = (*iter)->GetConnectionDescription();
555 if (!desc.empty()) {
556 ss2 << (first ? "" : ",") << desc;
557 first = false;
558 }
559 }
560 if (!first)
561 ss << "RemoteConnections: " << ss2.str() << crlf;
562 ss << "End" << crlf;
563 rec.bytes_transferred = bytes_transferred;
564 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
565
566 return req.ChunkResp(ss.str().c_str(), 0);
567}
568
569/******************************************************************************/
570/* XRD_CHUNK_RESP: */
571/* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
572/******************************************************************************/
573
574int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
575 TPCLogRecord &rec)
576{
577 // Create the multi-handle and add in the current transfer to it.
578 CURLM *multi_handle = curl_multi_init();
579 if (!multi_handle) {
580 rec.status = 500;
581 logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
582 "Failed to initialize a libcurl multi-handle");
583 std::stringstream ss;
584 ss << "Failed to initialize internal server memory";
585 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
586 }
587
588 //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
589
590 CURLMcode mres;
591 mres = curl_multi_add_handle(multi_handle, curl);
592 if (mres) {
593 rec.status = 500;
594 std::stringstream ss;
595 ss << "Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
596 logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
597 curl_multi_cleanup(multi_handle);
598 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
599 }
600
601 // Start response to client prior to the first call to curl_multi_perform
602 int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
603 if (retval) {
604 curl_multi_cleanup(multi_handle);
605 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
606 "Failed to send the initial response to the TPC client");
607 return retval;
608 } else {
609 logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
610 "Initial transfer response sent to the TPC client");
611 }
612
613 // Transfer loop: use curl to actually run the transfer, but periodically
614 // interrupt things to send back performance updates to the client.
615 int running_handles = 1;
616 time_t last_marker = 0;
617 // Track how long it's been since the last time we recorded more bytes being transferred.
618 off_t last_advance_bytes = 0;
619 time_t last_advance_time = time(NULL);
620 time_t transfer_start = last_advance_time;
621 CURLcode res = static_cast<CURLcode>(-1);
622 do {
623 time_t now = time(NULL);
624 time_t next_marker = last_marker + m_marker_period;
625 if (now >= next_marker) {
626 off_t bytes_xfer = state.BytesTransferred();
627 if (bytes_xfer > last_advance_bytes) {
628 last_advance_bytes = bytes_xfer;
629 last_advance_time = now;
630 }
631 if (SendPerfMarker(req, rec, state)) {
632 curl_multi_remove_handle(multi_handle, curl);
633 curl_multi_cleanup(multi_handle);
634 logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
635 "Failed to send a perf marker to the TPC client");
636 return -1;
637 }
638 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
639 if (now > last_advance_time + timeout) {
640 const char *log_prefix = rec.log_prefix.c_str();
641 bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
642
643 state.SetErrorCode(10);
644 std::stringstream ss;
645 ss << "Transfer failed because no bytes have been "
646 << (tpc_pull ? "received from the source (pull mode) in "
647 : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
648 state.SetErrorMessage(ss.str());
649 curl_multi_remove_handle(multi_handle, curl);
650 curl_multi_cleanup(multi_handle);
651 break;
652 }
653 last_marker = now;
654 }
655 // The transfer will start after this point, notify the packet marking manager
656 rec.pmarkManager.startTransfer();
657 mres = curl_multi_perform(multi_handle, &running_handles);
658 if (mres == CURLM_CALL_MULTI_PERFORM) {
659 // curl_multi_perform should be called again immediately. On newer
660 // versions of curl, this is no longer used.
661 continue;
662 } else if (mres != CURLM_OK) {
663 break;
664 } else if (running_handles == 0) {
665 break;
666 }
667
668 rec.pmarkManager.beginPMarks();
669 //printf("There are %d running handles\n", running_handles);
670
671 // Harvest any messages, looking for CURLMSG_DONE.
672 CURLMsg *msg;
673 do {
674 int msgq = 0;
675 msg = curl_multi_info_read(multi_handle, &msgq);
676 if (msg && (msg->msg == CURLMSG_DONE)) {
677 CURL *easy_handle = msg->easy_handle;
678 res = msg->data.result;
679 curl_multi_remove_handle(multi_handle, easy_handle);
680 }
681 } while (msg);
682
683 int64_t max_sleep_time = next_marker - time(NULL);
684 if (max_sleep_time <= 0) {
685 continue;
686 }
687 int fd_count;
688#ifdef HAVE_CURL_MULTI_WAIT
689 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
690#else
691 mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000, &fd_count);
692#endif
693 if (mres != CURLM_OK) {
694 break;
695 }
696 } while (running_handles);
697
698 if (mres != CURLM_OK) {
699 std::stringstream ss;
700 ss << "Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
701 logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
702
703 curl_multi_remove_handle(multi_handle, curl);
704 curl_multi_cleanup(multi_handle);
705
706 if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
707 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
708 "Failed to send error message to the TPC client");
709 return retval;
710 }
711 return req.ChunkResp(NULL, 0);
712 }
713
714 // Harvest any messages, looking for CURLMSG_DONE.
715 CURLMsg *msg;
716 do {
717 int msgq = 0;
718 msg = curl_multi_info_read(multi_handle, &msgq);
719 if (msg && (msg->msg == CURLMSG_DONE)) {
720 CURL *easy_handle = msg->easy_handle;
721 res = msg->data.result;
722 curl_multi_remove_handle(multi_handle, easy_handle);
723 }
724 } while (msg);
725
726 if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
727 curl_multi_remove_handle(multi_handle, curl);
728 curl_multi_cleanup(multi_handle);
729 std::stringstream ss;
730 ss << "Internal state error in libcurl";
731 logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
732
733 if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
734 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
735 "Failed to send error message to the TPC client");
736 return retval;
737 }
738 return req.ChunkResp(NULL, 0);
739 }
740 curl_multi_cleanup(multi_handle);
741
742 state.Flush();
743
744 rec.bytes_transferred = state.BytesTransferred();
745 rec.tpc_status = state.GetStatusCode();
746
747 // Explicitly finalize the stream (which will close the underlying file
748 // handle) before the response is sent. In some cases, subsequent HTTP
749 // requests can occur before the filesystem is done closing the handle -
750 // and those requests may occur against partial data.
751 state.Finalize();
752
753 // Generate the final response back to the client.
754 std::stringstream ss;
755 bool success = false;
756 if (state.GetStatusCode() >= 400) {
757 std::string err = state.GetErrorMessage();
758 std::stringstream ss2;
759 ss2 << "Remote side failed with status code " << state.GetStatusCode();
760 if (!err.empty()) {
761 std::replace(err.begin(), err.end(), '\n', ' ');
762 ss2 << "; error message: \"" << err << "\"";
763 }
764 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
765 ss << generateClientErr(ss2, rec);
766 } else if (state.GetErrorCode()) {
767 std::string err = state.GetErrorMessage();
768 if (err.empty()) {err = "(no error message provided)";}
769 else {std::replace(err.begin(), err.end(), '\n', ' ');}
770 std::stringstream ss2;
771 ss2 << "Error when interacting with local filesystem: " << err;
772 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
773 ss << generateClientErr(ss2, rec);
774 } else if (res != CURLE_OK) {
775 std::stringstream ss2;
776 ss2 << "Internal transfer failure";
777 std::stringstream ss3;
778 ss3 << ss2.str() << ": " << curl_easy_strerror(res);
779 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
780 ss << generateClientErr(ss2, rec, res);
781 } else {
782 ss << "success: Created";
783 success = true;
784 }
785
786 if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
787 logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
788 "Failed to send last update to remote client");
789 return retval;
790 } else if (success) {
791 logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
792 rec.status = 0;
793 }
794 return req.ChunkResp(NULL, 0);
795}
796
797/******************************************************************************/
798/* !XRD_CHUNK_RESP: */
799/* T P C H a n d l e r : : R u n C u r l B a s i c */
800/******************************************************************************/
801
802#else
803int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state,
804 TPCLogRecord &rec) {
805 const char *log_prefix = rec.log_prefix.c_str();
806 CURLcode res;
807 res = curl_easy_perform(curl);
808 state.Flush();
809 state.Finalize();
810 if (state.GetErrorCode()) {
811 std::string err = state.GetErrorMessage();
812 if (err.empty()) {err = "(no error message provided)";}
813 else {std::replace(err.begin(), err.end(), '\n', ' ');}
814 std::stringstream ss2;
815 ss2 << "Error when interacting with local filesystem: " << err;
816 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
817 ss << "failure: " << ss2.str();
818 } else if (res == CURLE_HTTP_RETURNED_ERROR) {
819 m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res));
820 return req.SendSimpleResp(500, NULL, NULL,
821 const_cast<char *>(curl_easy_strerror(res)), 0);
822 } else if (state.GetStatusCode() >= 400) {
823 std::stringstream ss;
824 ss << "Remote side failed with status code " << state.GetStatusCode();
825 m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
826 return req.SendSimpleResp(500, NULL, NULL,
827 const_cast<char *>(ss.str().c_str()), 0);
828 } else if (res) {
829 m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res));
830 char msg[] = "Unknown internal transfer failure";
831 return req.SendSimpleResp(500, NULL, NULL, msg, 0);
832 } else {
833 char msg[] = "Created";
834 rec.status = 0;
835 return req.SendSimpleResp(201, NULL, NULL, msg, 0);
836 }
837}
838#endif
839
840/******************************************************************************/
841/* T P C H a n d l e r : : P r o c e s s P u s h R e q */
842/******************************************************************************/
843
844int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
845 TPCLogRecord rec(req);
846 rec.log_prefix = "PushRequest";
847 rec.local = req.resource;
848 rec.remote = resource;
849 rec.m_log = &m_log;
850 char *name = req.GetSecEntity().name;
851 req.GetClientID(rec.clID);
852 if (name) rec.name = name;
853 logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
854
855 ManagedCurlHandle curlPtr(curl_easy_init());
856 auto curl = curlPtr.get();
857 if (!curl) {
858 std::stringstream ss;
859 ss << "Failed to initialize internal transfer resources";
860 rec.status = 500;
861 logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
862 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
863 }
864 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
865 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
866// curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_setcloexec_callback);
867
868 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
869 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
870 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
871 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
872 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
873 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
874 auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
875 std::string redirect_resource = req.resource;
876 if (query_header != req.headers.end()) {
877 redirect_resource = query_header->second;
878 }
879
880 AtomicBeg(m_monid_mutex);
881 uint64_t file_monid = AtomicInc(m_monid);
882 AtomicEnd(m_monid_mutex);
883 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
884 if (!fh.get()) {
885 rec.status = 500;
886 std::stringstream ss;
887 ss << "Failed to initialize internal transfer file handle";
888 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
889 ss.str());
890 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
891 }
892 std::string full_url = prepareURL(req);
893
894 std::string authz = GetAuthz(req);
895
896 int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
897 req.GetSecEntity(), authz);
898 if (SFS_REDIRECT == open_results) {
899 int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
900 return result;
901 } else if (SFS_OK != open_results) {
902 int code;
903 std::stringstream ss;
904 const char *msg = fh->error.getErrText(code);
905 if (msg == NULL) ss << "Failed to open local resource";
906 else ss << msg;
907 rec.status = 400;
908 if (code == EACCES) rec.status = 401;
909 else if (code == EEXIST) rec.status = 412;
910 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
911 int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
912 fh->close();
913 return resp_result;
914 }
915 ConfigureCurlCA(curl);
916 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
917
918 Stream stream(std::move(fh), 0, 0, m_log);
919 State state(0, stream, curl, true, req.tpcForwardCreds);
920 state.CopyHeaders(req);
921
922#ifdef XRD_CHUNK_RESP
923 return RunCurlWithUpdates(curl, req, state, rec);
924#else
925 return RunCurlBasic(curl, req, state, rec);
926#endif
927}
928
929/******************************************************************************/
930/* T P C H a n d l e r : : P r o c e s s P u l l R e q */
931/******************************************************************************/
932
933int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
934 TPCLogRecord rec(req);
935 rec.log_prefix = "PullRequest";
936 rec.local = req.resource;
937 rec.remote = resource;
938 rec.m_log = &m_log;
939 char *name = req.GetSecEntity().name;
940 req.GetClientID(rec.clID);
941 if (name) rec.name = name;
942 logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
943
944 ManagedCurlHandle curlPtr(curl_easy_init());
945 auto curl = curlPtr.get();
946 if (!curl) {
947 std::stringstream ss;
948 ss << "Failed to initialize internal transfer resources";
949 rec.status = 500;
950 logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
951 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
952 }
953 // ddavila 2023-01-05:
954 // The following change was required by the Rucio/SENSE project where
955 // multiple IP addresses, each from a different subnet, are assigned to a
956 // single server and routed differently by SENSE.
957 // The above requires the server to utilize the same IP, that was used to
958 // start the TPC, for the resolution of the given TPC instead of
959 // using any of the IPs available.
960 if (m_fixed_route){
961 XrdNetAddr *nP;
962 int numIP = 0;
963 char buff[1024];
964 char * ip;
965
966 // Get the hostname used to contact the server from the http header
967 auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
968 std::string host_used;
969 if (host_header != req.headers.end()) {
970 host_used = host_header->second;
971 }
972
973 // Get the IP addresses associated with the above hostname
974 XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
975 int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
976 ip = (char *)malloc(ip_size-1);
977
978 // Substring to get only the address, remove brackets and garbage
979 memcpy(ip, buff+1, ip_size-2);
980 ip[ip_size-2]='\0';
981 logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
982
983 curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
984 }
985 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
986 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
987// curl_easy_setopt(curl,CURLOPT_SOCKOPTFUNCTION,sockopt_setcloexec_callback);
988 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
989 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
990 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
991 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
992 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
993 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
994 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
995 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
996 if (!fh.get()) {
997 std::stringstream ss;
998 ss << "Failed to initialize internal transfer file handle";
999 rec.status = 500;
1000 logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1001 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1002 }
1003 auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
1004 std::string redirect_resource = req.resource;
1005 if (query_header != req.headers.end()) {
1006 redirect_resource = query_header->second;
1007 }
1009 auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
1010 if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
1011 if (! usingEC) mode = SFS_O_TRUNC;
1012 }
1013 int streams = 1;
1014 {
1015 auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
1016 if (streams_header != req.headers.end()) {
1017 int stream_req = -1;
1018 try {
1019 stream_req = std::stol(streams_header->second);
1020 } catch (...) { // Handled below
1021 }
1022 if (stream_req < 0 || stream_req > 100) {
1023 std::stringstream ss;
1024 ss << "Invalid request for number of streams";
1025 rec.status = 400;
1026 logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
1027 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1028 }
1029 streams = stream_req == 0 ? 1 : stream_req;
1030 }
1031 }
1032 rec.streams = streams;
1033 bool hasSetOpaque = false;
1034 std::string full_url = prepareURL(req, hasSetOpaque);
1035 std::string authz = GetAuthz(req);
1036 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1037 ConfigureCurlCA(curl);
1038#ifdef XRD_CHUNK_RESP
1039 {
1040 //Get the content-length of the source file and pass it to the OSS layer
1041 //during the open
1042 uint64_t sourceFileContentLength = 0;
1043 bool success;
1044 GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
1045 if(success) {
1046 //In the case we cannot get the information from the source server (offline or other error)
1047 //we just don't add the size information to the opaque of the local file to open
1048 full_url += hasSetOpaque ? "&" : "?";
1049 full_url += "oss.asize=" + std::to_string(sourceFileContentLength);
1050 }
1051 }
1052#endif
1053 int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
1054 0644 | SFS_O_MKPTH,
1055 req.GetSecEntity(), authz);
1056 if (SFS_REDIRECT == open_result) {
1057 int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1058 return result;
1059 } else if (SFS_OK != open_result) {
1060 int code;
1061 std::stringstream ss;
1062 const char *msg = fh->error.getErrText(code);
1063 if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
1064 else ss << msg;
1065 rec.status = 400;
1066 if (code == EACCES) rec.status = 401;
1067 else if (code == EEXIST) rec.status = 412;
1068 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
1069 int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
1070 generateClientErr(ss, rec).c_str(), 0);
1071 fh->close();
1072 return resp_result;
1073 }
1074 Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1075 State state(0, stream, curl, false, req.tpcForwardCreds);
1076 state.CopyHeaders(req);
1077
1078#ifdef XRD_CHUNK_RESP
1079 if (streams > 1) {
1080 return RunCurlWithStreams(req, state, streams, rec);
1081 } else {
1082 return RunCurlWithUpdates(curl, req, state, rec);
1083 }
1084#else
1085 return RunCurlBasic(curl, req, state, rec);
1086#endif
1087}
1088
1089/******************************************************************************/
1090/* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
1091/******************************************************************************/
1092
1093void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
1094 const std::string &event, const std::string &message)
1095{
1096 if (!(m_log.getMsgMask() & mask)) {return;}
1097
1098 std::stringstream ss;
1099 ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
1100 if (rec.name.empty())
1101 ss << ", user=(anonymous)";
1102 else
1103 ss << ", user=" << rec.name;
1104 if (rec.streams != 1)
1105 ss << ", streams=" << rec.streams;
1106 if (rec.bytes_transferred >= 0)
1107 ss << ", bytes_transferred=" << rec.bytes_transferred;
1108 if (rec.status >= 0)
1109 ss << ", status=" << rec.status;
1110 if (rec.tpc_status >= 0)
1111 ss << ", tpc_status=" << rec.tpc_status;
1112 if (!message.empty())
1113 ss << "; " << message;
1114 m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1115}
1116
1117std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
1118 std::stringstream ssret;
1119 ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
1120 if(cCode != CURLcode::CURLE_OK) {
1121 ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
1122 }
1123 return ssret.str();
1124}
1125/******************************************************************************/
1126/* X r d H t t p G e t E x t H a n d l e r */
1127/******************************************************************************/
1128
1129extern "C" {
1130
1131XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
1132 if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1133 log->Emsg("TPCInitialize", "libcurl failed to initialize");
1134 return NULL;
1135 }
1136
1137 TPCHandler *retval{NULL};
1138 if (!config) {
1139 log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
1140 return NULL;
1141 }
1142 try {
1143 log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
1144 retval = new TPCHandler(log, config, myEnv);
1145 } catch (std::runtime_error &re) {
1146 log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
1147 //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
1148 }
1149 return retval;
1150}
1151
1152}
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdHttpExtHandlerArgs)
char * quote(const char *str)
#define close(a)
Definition XrdPosix.hh:43
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
if(ec< 0) ec
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
void CURL
static std::string PrepareURL(const std::string &input)
Definition XrdTpcTPC.cc:257
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
Definition XrdTpcTPC.cc:191
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
int GetStatusCode() const
off_t BytesTransferred() const
void CopyHeaders(XrdHttpExtReq &req)
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
off_t GetContentLength() const
void SetErrorCode(int error_code)
bool Finalize()
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
Definition XrdTpcTPC.cc:304
virtual int ProcessReq(XrdHttpExtReq &req)
Definition XrdTpcTPC.cc:268
virtual ~TPCHandler()
Definition XrdTpcTPC.cc:296
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
Definition XrdTpcTPC.cc:249
std::string clienthost
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
char * name
Entity's name.
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
bool atLeastOneValidCRLFound() const
std::string CAFilename() const
std::string CRLFilename() const
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap, bool &hasSetOpaque)
void Report(TpcInfo &info)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
Definition XrdTpcTPC.hh:39
LogMask
Definition XrdTpcTPC.hh:27
@ Info
Definition XrdTpcTPC.hh:29
@ Error
Definition XrdTpcTPC.hh:31
@ Debug
Definition XrdTpcTPC.hh:28
@ Warning
Definition XrdTpcTPC.hh:30
void operator()(CURL *curl)
Definition XrdTpcTPC.cc:84
static const int uIPv64
ucap: Supports only IPv4 info