XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor.
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg)
 
time_t GetExpiration ()
 Get a timestamp after which we give up.
 
const MessageGetRequest () const
 Get the request pointer.
 
virtual uint16_t GetSid () const
 
virtual uint16_t InspectStatusRsp ()
 
virtual bool IsRaw () const
 Are we a raw writer or not?
 
virtual void OnStatusReady (const Message *message, XRootDStatus status)
 The requested action has been performed and the status is available.
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status)
 
void PartialReceived ()
 
virtual void Process ()
 Process the message if it was "taken" by the examine action.
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead)
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list.
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up.
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list.
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer.
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer.
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter.
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten)
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive.
 
virtual void OnReadyToSend (Message *msg)
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 118 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message * msg,
ResponseHandler * respHandler,
const URL * url,
std::shared_ptr< SIDManager > sidMgr,
LocalFileHandler * lFileHandler )
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 133 of file XrdClXRootDMsgHandler.hh.

137 :
138 pRequest( msg ),
139 pResponseHandler( respHandler ),
140 pUrl( *url ),
141 pEffectiveDataServerUrl( 0 ),
142 pSidMgr( sidMgr ),
143 pLFileHandler( lFileHandler ),
144 pExpiration( 0 ),
145 pRedirectAsAnswer( false ),
146 pOksofarAsAnswer( false ),
147 pHasLoadBalancer( false ),
148 pHasSessionId( false ),
149 pChunkList( 0 ),
150 pKBuff( 0 ),
151 pRedirectCounter( 0 ),
152 pNotAuthorizedCounter( 0 ),
153
154 pAsyncOffset( 0 ),
155 pAsyncChunkIndex( 0 ),
156
157 pPgWrtCksumBuff( 4 ),
158 pPgWrtCurrentPageOffset( 0 ),
159 pPgWrtCurrentPageNb( 0 ),
160
161 pOtherRawStarted( false ),
162
163 pFollowMetalink( false ),
164
165 pStateful( false ),
166
167 pAggregatedWaitTime( 0 ),
168
169 pMsgInFly( false ),
170
171 pTimeoutFence( false ),
172
173 pDirListStarted( false ),
174 pDirListWithStat( false ),
175
176 pCV( 0 ),
177
178 pSslErrCnt( 0 )
179 {
180 pPostMaster = DefaultEnv::GetPostMaster();
181 if( msg->GetSessionId() )
182 pHasSessionId = true;
183
184 Log *log = DefaultEnv::GetLog();
185 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
186 pUrl.GetHostId().c_str(), this,
187 pRequest->GetDescription().c_str() );
188
189 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
190 if( ntohs( hdr->requestid ) == kXR_pgread )
191 {
192 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
193 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
194 ntohl( pgrdreq->rlen ) ) );
195 }
196
197 if( ntohs( hdr->requestid ) == kXR_readv )
198 pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
199 else if( ntohs( hdr->requestid ) == kXR_read )
200 pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
201 else
202 pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
203 }
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_pgread
Definition XProtocol.hh:142
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetDescription() const
Get the description of the message.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition XrdConfig.cc:111

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 208 of file XrdClXRootDMsgHandler.hh.

209 {
210 DumpRedirectTraceBack();
211
212 if( !pHasSessionId )
213 delete pRequest;
214 delete pEffectiveDataServerUrl;
215
216 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
217 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
218 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
219 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
220 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
221 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
222
223 Log *log = DefaultEnv::GetLog();
224 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
225 pUrl.GetHostId().c_str(), this );
226 }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > & msg)
virtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 107 of file XrdClXRootDMsgHandler.cc.

108 {
109 //--------------------------------------------------------------------------
110 // if the MsgHandler is already being used to process another request
111 // (kXR_oksofar) we need to wait
112 //--------------------------------------------------------------------------
113 if( pOksofarAsAnswer )
114 {
115 XrdSysCondVarHelper lck( pCV );
116 while( pResponse ) pCV.Wait();
117 }
118 else
119 {
120 if( pResponse )
121 {
122 Log *log = DefaultEnv::GetLog();
123 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
124 "it already owns a response: 0x%x (message: %s ).",
125 pUrl.GetHostId().c_str(), this,
126 pRequest->GetDescription().c_str() );
127 }
128 }
129
130 if( msg->GetSize() < 8 )
131 return Ignore;
132
133 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
134 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
135 uint16_t status = 0;
136 uint32_t dlen = 0;
137
138 //--------------------------------------------------------------------------
139 // We only care about async responses, but those are extracted now
140 // in the SocketHandler.
141 //--------------------------------------------------------------------------
142 if( rsp->hdr.status == kXR_attn )
143 {
144 return Ignore;
145 }
146 //--------------------------------------------------------------------------
147 // We got a sync message - check if it belongs to us
148 //--------------------------------------------------------------------------
149 else
150 {
151 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
152 rsp->hdr.streamid[1] != req->header.streamid[1] )
153 return Ignore;
154
155 status = rsp->hdr.status;
156 dlen = rsp->hdr.dlen;
157 }
158
159 //--------------------------------------------------------------------------
160 // We take the ownership of the message and decide what we will do
161 // with the handler itself, the options are:
162 // 1) we want to either read in raw mode (the Raw flag) or have the message
163 // body reconstructed for us by the TransportHandler by the time
164 // Process() is called (default, no extra flag)
165 // 2) we either got a full response in which case we don't want to be
166 // notified about anything anymore (RemoveHandler) or we got a partial
167 // answer and we need to wait for more (default, no extra flag)
168 //--------------------------------------------------------------------------
169 pResponse = msg;
170 pBodyReader->SetDataLength( dlen );
171
172 Log *log = DefaultEnv::GetLog();
173 switch( status )
174 {
175 //------------------------------------------------------------------------
176 // Handle the cached cases
177 //------------------------------------------------------------------------
178 case kXR_error:
179 case kXR_redirect:
180 case kXR_wait:
181 return RemoveHandler;
182
183 case kXR_waitresp:
184 {
185 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
186 "message %s", pUrl.GetHostId().c_str(),
187 pRequest->GetDescription().c_str() );
188
189 pResponse.reset();
190 return Ignore; // This must be handled synchronously!
191 }
192
193 //------------------------------------------------------------------------
194 // Handle the potential raw cases
195 //------------------------------------------------------------------------
196 case kXR_ok:
197 {
198 //----------------------------------------------------------------------
199 // For kXR_read we read in raw mode
200 //----------------------------------------------------------------------
201 uint16_t reqId = ntohs( req->header.requestid );
202 if( reqId == kXR_read )
203 {
204 return Raw | RemoveHandler;
205 }
206
207 //----------------------------------------------------------------------
208 // kXR_readv is the same as kXR_read
209 //----------------------------------------------------------------------
210 if( reqId == kXR_readv )
211 {
212 return Raw | RemoveHandler;
213 }
214
215 //----------------------------------------------------------------------
216 // For everything else we just take what we got
217 //----------------------------------------------------------------------
218 return RemoveHandler;
219 }
220
221 //------------------------------------------------------------------------
222 // kXR_oksofars are special, they are not full responses, so we reset
223 // the response pointer to 0 and add the message to the partial list
224 //------------------------------------------------------------------------
225 case kXR_oksofar:
226 {
227 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
228 "%s", pUrl.GetHostId().c_str(),
229 pRequest->GetDescription().c_str() );
230
231 if( !pOksofarAsAnswer )
232 {
233 pPartialResps.emplace_back( std::move( pResponse ) );
234 }
235
236 //----------------------------------------------------------------------
237 // For kXR_read we either read in raw mode if the message has not
238 // been fully reconstructed already, if it has, we adjust
239 // the buffer offset to prepare for the next one
240 //----------------------------------------------------------------------
241 uint16_t reqId = ntohs( req->header.requestid );
242 if( reqId == kXR_read )
243 {
244 pTimeoutFence.store( true, std::memory_order_relaxed );
245 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
246 }
247
248 //----------------------------------------------------------------------
249 // kXR_readv is similar to read, except that the payload is different
250 //----------------------------------------------------------------------
251 if( reqId == kXR_readv )
252 {
253 pTimeoutFence.store( true, std::memory_order_relaxed );
254 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
255 }
256
257 return ( pOksofarAsAnswer ? None : NoProcess );
258 }
259
260 case kXR_status:
261 {
262 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
263 "%s", pUrl.GetHostId().c_str(),
264 pRequest->GetDescription().c_str() );
265
266 uint16_t reqId = ntohs( req->header.requestid );
267 if( reqId == kXR_pgwrite )
268 {
269 //--------------------------------------------------------------------
270 // In case of pgwrite by definition this wont be a partial response
271 // so we can already remove the handler from the in-queue
272 //--------------------------------------------------------------------
273 return RemoveHandler;
274 }
275
276 //----------------------------------------------------------------------
277 // Otherwise (pgread), first of all we need to read the body of the
278 // kXR_status response, we can handle the raw data (if any) only after
279 // we have the whole kXR_status body
280 //----------------------------------------------------------------------
281 pTimeoutFence.store( true, std::memory_order_relaxed );
282 return None;
283 }
284
285 //------------------------------------------------------------------------
286 // Default
287 //------------------------------------------------------------------------
288 default:
289 return RemoveHandler;
290 }
291 return RemoveHandler;
292 }
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:912
@ kXR_waitresp
Definition XProtocol.hh:904
@ kXR_redirect
Definition XProtocol.hh:902
@ kXR_oksofar
Definition XProtocol.hh:898
@ kXR_status
Definition XProtocol.hh:905
@ kXR_ok
Definition XProtocol.hh:897
@ kXR_attn
Definition XProtocol.hh:899
@ kXR_wait
Definition XProtocol.hh:903
@ kXR_error
Definition XProtocol.hh:901
struct ClientRequestHdr header
Definition XProtocol.hh:844
@ kXR_pgwrite
Definition XProtocol.hh:138
ServerResponseHeader hdr
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlinevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 329 of file XrdClXRootDMsgHandler.hh.

330 {
331 return pExpiration;
332 }

◆ GetRequest()

const Message * XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 355 of file XrdClXRootDMsgHandler.hh.

356 {
357 return pRequest;
358 }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
virtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 387 of file XrdClXRootDMsgHandler.cc.

388 {
389 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
390 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
391 }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
virtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 297 of file XrdClXRootDMsgHandler.cc.

298 {
299 if( !pResponse )
300 return 0;
301
302 Log *log = DefaultEnv::GetLog();
303 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
304
305 //--------------------------------------------------------------------------
306 // Additional action is only required for kXR_status
307 //--------------------------------------------------------------------------
308 if( rsp->hdr.status != kXR_status ) return 0;
309
310 //--------------------------------------------------------------------------
311 // Ignore malformed status response
312 //--------------------------------------------------------------------------
313 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
314 {
315 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
316 return Corrupted;
317 }
318
319 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
320 uint16_t reqId = ntohs( req->header.requestid );
321 //--------------------------------------------------------------------------
322 // Unmarshal the status body
323 //--------------------------------------------------------------------------
324 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
325
326 if( !st.IsOK() && st.code == errDataError )
327 {
328 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
329 st.GetErrorMessage().c_str() );
330 return Corrupted;
331 }
332
333 if( !st.IsOK() )
334 {
335 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
336 pUrl.GetHostId().c_str() );
337 pStatus = st;
338 HandleRspOrQueue();
339 return Ignore;
340 }
341
342 //--------------------------------------------------------------------------
343 // Common handling for partial results
344 //--------------------------------------------------------------------------
345 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
347 {
348 pPartialResps.push_back( std::move( pResponse ) );
349 }
350
351 //--------------------------------------------------------------------------
352 // Decide the actions that we need to take
353 //--------------------------------------------------------------------------
354 uint16_t action = 0;
355 if( reqId == kXR_pgread )
356 {
357 //----------------------------------------------------------------------
358 // The message contains only Status header and body but no raw data
359 //----------------------------------------------------------------------
360 if( !pPageReader )
361 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
362 pPageReader->SetRsp( rspst );
363
364 action |= Raw;
365
367 action |= NoProcess;
368 else
369 action |= RemoveHandler;
370 }
371 else if( reqId == kXR_pgwrite )
372 {
373 // if data corruption has been detected on the server side we will
374 // send some additional data pointing to the pages that need to be
375 // retransmitted
376 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
377 pResponse->GetCursor() )
378 action |= More;
379 }
380
381 return action;
382 }
ServerResponseStatus status
struct ServerResponseBody_Status bdy
struct ServerResponseHeader hdr
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
@ kXR_PartialResult

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
virtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 927 of file XrdClXRootDMsgHandler.cc.

928 {
929 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
930 uint16_t reqId = ntohs( req->header.requestid );
931 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
932 return true;
933 // checkpoint + execute
934 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
935 {
936 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
937 reqId = ntohs( xeq->header.requestid );
938 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
939 }
940
941 return false;
942 }
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_chkpoint
Definition XProtocol.hh:124
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:847

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message * message,
XRootDStatus status )
virtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 894 of file XrdClXRootDMsgHandler.cc.

896 {
897 Log *log = DefaultEnv::GetLog();
898
899 //--------------------------------------------------------------------------
900 // We were successful, so we now need to listen for a response
901 //--------------------------------------------------------------------------
902 if( status.IsOK() )
903 {
904 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
905 pUrl.GetHostId().c_str(), message->GetDescription().c_str() );
906
907 log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: 0x%x (message: %s ) from out-queue to in-queue.",
908 pUrl.GetHostId().c_str(), this,
909 pRequest->GetDescription().c_str() );
910
911 pMsgInFly = true;
912 return;
913 }
914
915 //--------------------------------------------------------------------------
916 // We have failed, recover if possible
917 //--------------------------------------------------------------------------
918 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
919 "recover.", pUrl.GetHostId().c_str(),
920 message->GetDescription().c_str() );
921 HandleError( status );
922 }

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::ExDbgMsg, XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent event,
XRootDStatus status )
virtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 857 of file XrdClXRootDMsgHandler.cc.

859 {
860 Log *log = DefaultEnv::GetLog();
861 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
862 pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
863
864 if( event == Ready )
865 return 0;
866
867 if( pTimeoutFence.load( std::memory_order_relaxed ) )
868 return 0;
869
870 HandleError( status );
871 return RemoveHandler;
872 }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1104 of file XrdClXRootDMsgHandler.cc.

1105 {
1106 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1107 }

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::OnError(), and XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
virtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 396 of file XrdClXRootDMsgHandler.cc.

397 {
398 Log *log = DefaultEnv::GetLog();
399
400 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
401
402 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
403
404 //--------------------------------------------------------------------------
405 // If it is a local file, it can be only a metalink redirector
406 //--------------------------------------------------------------------------
407 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
408 pHosts->back().protocol = kXR_PROTOCOLVERSION;
409
410 //--------------------------------------------------------------------------
411 // We got an answer, check who we were talking to
412 //--------------------------------------------------------------------------
413 else
414 {
415 AnyObject qryResult;
416 int *qryResponse = 0;
417 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
418 qryResult.Get( qryResponse );
419 pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
420 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
421 qryResult.Get( qryResponse );
422 pHosts->back().protocol = *qryResponse; delete qryResponse;
423 }
424
425 //--------------------------------------------------------------------------
426 // Process the message
427 //--------------------------------------------------------------------------
428 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
429 if( !st.IsOK() )
430 {
431 pStatus = Status( stFatal, errInvalidMessage );
432 HandleResponse();
433 return;
434 }
435
436 //--------------------------------------------------------------------------
437 // we have an response for the message so it's not in fly anymore
438 //--------------------------------------------------------------------------
439 pMsgInFly = false;
440
441 //--------------------------------------------------------------------------
442 // Reset the aggregated wait (used to omit wait response in case of Metalink
443 // redirector)
444 //--------------------------------------------------------------------------
445 if( rsp->hdr.status != kXR_wait )
446 pAggregatedWaitTime = 0;
447
448 switch( rsp->hdr.status )
449 {
450 //------------------------------------------------------------------------
451 // kXR_ok - we're done here
452 //------------------------------------------------------------------------
453 case kXR_ok:
454 {
455 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
456 pUrl.GetHostId().c_str(),
457 pRequest->GetDescription().c_str() );
458 pStatus = Status();
459 HandleResponse();
460 return;
461 }
462
463 case kXR_status:
464 {
465 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
466 pUrl.GetHostId().c_str(),
467 pRequest->GetDescription().c_str() );
468 pStatus = Status();
469 HandleResponse();
470 return;
471 }
472
473 //------------------------------------------------------------------------
474 // kXR_ok - we're serving partial result to the user
475 //------------------------------------------------------------------------
476 case kXR_oksofar:
477 {
478 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
479 pUrl.GetHostId().c_str(),
480 pRequest->GetDescription().c_str() );
481 pStatus = Status( stOK, suContinue );
482 HandleResponse();
483 return;
484 }
485
486 //------------------------------------------------------------------------
487 // kXR_error - we've got a problem
488 //------------------------------------------------------------------------
489 case kXR_error:
490 {
491 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
492 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
493 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
494 "[%d] %s", pUrl.GetHostId().c_str(),
495 pRequest->GetDescription().c_str(), rsp->body.error.errnum,
496 errmsg );
497 delete [] errmsg;
498
499 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
500 return;
501 }
502
503 //------------------------------------------------------------------------
504 // kXR_redirect - they tell us to go elsewhere
505 //------------------------------------------------------------------------
506 case kXR_redirect:
507 {
508 if( rsp->hdr.dlen <= 4 )
509 {
510 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
511 pUrl.GetHostId().c_str() );
512 pStatus = Status( stError, errInvalidResponse );
513 HandleResponse();
514 return;
515 }
516
517 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
518 urlInfoBuff[rsp->hdr.dlen-4] = 0;
519 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
520 std::string urlInfo = urlInfoBuff;
521 delete [] urlInfoBuff;
522 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
523 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
524 pRequest->GetDescription().c_str(), urlInfo.c_str(),
525 rsp->body.redirect.port );
526
527 //----------------------------------------------------------------------
528 // Check if we can proceed
529 //----------------------------------------------------------------------
530 if( !pRedirectCounter )
531 {
532 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
533 "message %s, the last known error is: %s",
534 pUrl.GetHostId().c_str(),
535 pRequest->GetDescription().c_str(),
536 pLastError.ToString().c_str() );
537
538
539 pStatus = Status( stFatal, errRedirectLimit );
540 HandleResponse();
541 return;
542 }
543 --pRedirectCounter;
544
545 //----------------------------------------------------------------------
546 // Keep the info about this server if we still need to find a load
547 // balancer
548 //----------------------------------------------------------------------
549 uint32_t flags = pHosts->back().flags;
550 if( !pHasLoadBalancer )
551 {
552 if( flags & kXR_isManager )
553 {
554 //------------------------------------------------------------------
555 // If the current server is a meta manager then it supersedes
556 // any existing load balancer, otherwise we assign a load-balancer
557 // only if it has not been already assigned
558 //------------------------------------------------------------------
559 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
560 {
561 pLoadBalancer = pHosts->back();
562 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
563 "as a load-balancer for message %s",
564 pUrl.GetHostId().c_str(),
565 pRequest->GetDescription().c_str() );
566 HostList::iterator it;
567 for( it = pHosts->begin(); it != pHosts->end(); ++it )
568 it->loadBalancer = false;
569 pHosts->back().loadBalancer = true;
570 }
571 }
572 }
573
574 //----------------------------------------------------------------------
575 // If the redirect comes from a data server safe the URL because
576 // in case of a failure we will use it as the effective data server URL
577 // for the tried CGI opaque info
578 //----------------------------------------------------------------------
579 if( flags & kXR_isServer )
580 pEffectiveDataServerUrl = new URL( pHosts->back().url );
581
582 //----------------------------------------------------------------------
583 // Build the URL and check it's validity
584 //----------------------------------------------------------------------
585 std::vector<std::string> urlComponents;
586 std::string newCgi;
587 Utils::splitString( urlComponents, urlInfo, "?" );
588
589 std::ostringstream o;
590
591 o << urlComponents[0];
592 if( rsp->body.redirect.port > 0 )
593 o << ":" << rsp->body.redirect.port << "/";
594 else if( rsp->body.redirect.port < 0 )
595 {
596 //--------------------------------------------------------------------
597 // check if the manager wants to enforce write recovery at himself
598 // (beware we are dealing here with negative flags)
599 //--------------------------------------------------------------------
600 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
601 pHosts->back().flags |= kXR_recoverWrts;
602
603 //--------------------------------------------------------------------
604 // check if the manager wants to collapse the communication channel
605 // (the redirect host is to replace the current host)
606 //--------------------------------------------------------------------
607 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
608 {
609 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
610 pPostMaster->CollapseRedirect( pUrl, url );
611 }
612
613 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
614 {
615 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
616 if( Utils::CheckEC( pRequest, url ) )
617 pRedirectAsAnswer = true;
618 }
619 }
620
621 URL newUrl = URL( o.str() );
622 if( !newUrl.IsValid() )
623 {
624 pStatus = Status( stError, errInvalidRedirectURL );
625 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
626 pUrl.GetHostId().c_str(), urlInfo.c_str() );
627 HandleResponse();
628 return;
629 }
630
631 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
632 newUrl.SetUserName( pUrl.GetUserName() );
633
634 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
635 newUrl.SetPassword( pUrl.GetPassword() );
636
637 //----------------------------------------------------------------------
638 // Forward any "xrd.*" params from the original client request also to
639 // the new redirection url
640 // Also, we need to preserve any "xrdcl.*' as they are important for
641 // our internal workflows.
642 //----------------------------------------------------------------------
643 std::ostringstream ossXrd;
644 const URL::ParamsMap &urlParams = pUrl.GetParams();
645
646 for(URL::ParamsMap::const_iterator it = urlParams.begin();
647 it != urlParams.end(); ++it )
648 {
649 if( it->first.compare( 0, 4, "xrd." ) &&
650 it->first.compare( 0, 6, "xrdcl." ) )
651 continue;
652
653 ossXrd << it->first << '=' << it->second << '&';
654 }
655
656 std::string xrdCgi = ossXrd.str();
657 pRedirectUrl = newUrl.GetURL();
658
659 URL cgiURL;
660 if( urlComponents.size() > 1 )
661 {
662 pRedirectUrl += "?";
663 pRedirectUrl += urlComponents[1];
664 std::ostringstream o;
665 o << "fake://fake:111//fake?";
666 o << urlComponents[1];
667
668 if( urlComponents.size() == 3 )
669 o << '?' << urlComponents[2];
670
671 if (!xrdCgi.empty())
672 {
673 o << '&' << xrdCgi;
674 pRedirectUrl += '&';
675 pRedirectUrl += xrdCgi;
676 }
677
678 cgiURL = URL( o.str() );
679 }
680 else {
681 if (!xrdCgi.empty())
682 {
683 std::ostringstream o;
684 o << "fake://fake:111//fake?";
685 o << xrdCgi;
686 cgiURL = URL( o.str() );
687 pRedirectUrl += '?';
688 pRedirectUrl += xrdCgi;
689 }
690 }
691
692 //----------------------------------------------------------------------
693 // Check if we need to return the URL as a response
694 //----------------------------------------------------------------------
695 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
696 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
697 !newUrl.IsLocalFile() )
698 pRedirectAsAnswer = true;
699
700 if( pRedirectAsAnswer )
701 {
702 pStatus = Status( stError, errRedirect );
703 HandleResponse();
704 return;
705 }
706
707 //----------------------------------------------------------------------
708 // Rewrite the message in a way required to send it to another server
709 //----------------------------------------------------------------------
710 newUrl.SetParams( cgiURL.GetParams() );
711 Status st = RewriteRequestRedirect( newUrl );
712 if( !st.IsOK() )
713 {
714 pStatus = st;
715 HandleResponse();
716 return;
717 }
718
719 //----------------------------------------------------------------------
720 // Make sure we don't change the protocol by accident (root vs roots)
721 //----------------------------------------------------------------------
722 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
723 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
724 newUrl.SetProtocol( "roots" );
725
726 //----------------------------------------------------------------------
727 // Send the request to the new location
728 //----------------------------------------------------------------------
729 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
730 return;
731 }
732
733 //------------------------------------------------------------------------
734 // kXR_wait - we wait, and re-issue the request later
735 //------------------------------------------------------------------------
736 case kXR_wait:
737 {
738 uint32_t waitSeconds = 0;
739
740 if( rsp->hdr.dlen >= 4 )
741 {
742 char *infoMsg = new char[rsp->hdr.dlen-3];
743 infoMsg[rsp->hdr.dlen-4] = 0;
744 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
745 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
746 "message %s: %s", pUrl.GetHostId().c_str(),
747 rsp->body.wait.seconds, pRequest->GetDescription().c_str(),
748 infoMsg );
749 delete [] infoMsg;
750 waitSeconds = rsp->body.wait.seconds;
751 }
752 else
753 {
754 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
755 "message %s", pUrl.GetHostId().c_str(),
756 pRequest->GetDescription().c_str() );
757 }
758
759 pAggregatedWaitTime += waitSeconds;
760
761 // We need a special case if the data node comes from metalink
762 // redirector. In this case it might make more sense to try the
763 // next entry in the Metalink than wait.
764 if( OmitWait( *pRequest, pLoadBalancer.url ) )
765 {
766 int maxWait = DefaultMaxMetalinkWait;
767 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
768 if( pAggregatedWaitTime > maxWait )
769 {
770 UpdateTriedCGI();
771 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
772 return;
773 }
774 }
775
776 //----------------------------------------------------------------------
777 // Some messages require rewriting before they can be sent again
778 // after wait
779 //----------------------------------------------------------------------
780 Status st = RewriteRequestWait();
781 if( !st.IsOK() )
782 {
783 pStatus = st;
784 HandleResponse();
785 return;
786 }
787
788 //----------------------------------------------------------------------
789 // Register a task to resend the message in some seconds, if we still
790 // have time to do that, and report a timeout otherwise
791 //----------------------------------------------------------------------
792 time_t resendTime = ::time(0)+waitSeconds;
793
794 if( resendTime < pExpiration )
795 {
796 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: 0x%x (message: %s ).",
797 pUrl.GetHostId().c_str(), this,
798 pRequest->GetDescription().c_str() );
799
800 TaskManager *taskMgr = pPostMaster->GetTaskManager();
801 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
802 }
803 else
804 {
805 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
806 pUrl.GetHostId().c_str(),
807 pRequest->GetDescription().c_str() );
808 HandleError( Status( stError, errOperationExpired) );
809 }
810 return;
811 }
812
813 //------------------------------------------------------------------------
814 // kXR_waitresp - the response will be returned in some seconds as an
815 // unsolicited message. Currently all messages of this type are handled
816 // one step before in the XrdClStream::OnIncoming as they need to be
817 // processed synchronously.
818 //------------------------------------------------------------------------
819 case kXR_waitresp:
820 {
821 if( rsp->hdr.dlen < 4 )
822 {
823 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
824 pUrl.GetHostId().c_str() );
825 pStatus = Status( stError, errInvalidResponse );
826 HandleResponse();
827 return;
828 }
829
830 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
831 "message %s", pUrl.GetHostId().c_str(),
832 rsp->body.waitresp.seconds,
833 pRequest->GetDescription().c_str() );
834 return;
835 }
836
837 //------------------------------------------------------------------------
838 // Default - unrecognized/unsupported response, declare an error
839 //------------------------------------------------------------------------
840 default:
841 {
842 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
843 "message %s", pUrl.GetHostId().c_str(),
844 rsp->hdr.status, pRequest->GetDescription().c_str() );
845 pStatus = Status( stError, errInvalidResponse );
846 HandleResponse();
847 return;
848 }
849 }
850
851 return;
852 }
#define kXR_isManager
#define kXR_collapseRedir
#define kXR_attrMeta
#define kXR_recoverWrts
#define kXR_isServer
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
#define kXR_ecRedir
union ServerResponse::@1 body
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:451
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:148
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:130
bool IsLocalFile() const
Definition XrdClURL.cc:460
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:239
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:113
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:438
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t suContinue
const uint16_t errRedirect
const uint16_t errInvalidMessage
URL url
URL of the host.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::Message::GetDescription(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message * msg,
Socket * socket,
uint32_t & bytesRead )
virtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 877 of file XrdClXRootDMsgHandler.cc.

880 {
881 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
882 uint16_t reqId = ntohs( req->header.requestid );
883
884 if( reqId == kXR_pgread )
885 return pPageReader->Read( *socket, bytesRead );
886
887 return pBodyReader->Read( *socket, bytesRead );
888 }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList * chunkList)
inline

Set the chunk list.

Definition at line 382 of file XrdClXRootDMsgHandler.hh.

383 {
384 pChunkList = chunkList;
385 if( pBodyReader )
386 pBodyReader->SetChunkList( chunkList );
387 if( chunkList )
388 pChunkStatus.resize( chunkList->size() );
389 else
390 pChunkStatus.clear();
391 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > && crc32cDigests)
inline

Definition at line 393 of file XrdClXRootDMsgHandler.hh.

394 {
395 pCrc32cDigests = std::move( crc32cDigests );
396 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t expiration)
inline

Set a timestamp after which we give up.

Definition at line 321 of file XrdClXRootDMsgHandler.hh.

322 {
323 pExpiration = expiration;
324 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool followMetalink)
inline

Definition at line 414 of file XrdClXRootDMsgHandler.hh.

415 {
416 pFollowMetalink = followMetalink;
417 }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList * hostList)
inline

Set host list.

Definition at line 374 of file XrdClXRootDMsgHandler.hh.

375 {
376 pHosts.reset( hostList );
377 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer * kbuff)
inline

Set the kernel buffer.

Definition at line 401 of file XrdClXRootDMsgHandler.hh.

402 {
403 pKBuff = kbuff;
404 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo & loadBalancer)
inline

Set the load balancer.

Definition at line 363 of file XrdClXRootDMsgHandler.hh.

364 {
365 if( !loadBalancer.url.IsValid() )
366 return;
367 pLoadBalancer = loadBalancer;
368 pHasLoadBalancer = true;
369 }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 347 of file XrdClXRootDMsgHandler.hh.

348 {
349 pOksofarAsAnswer = oksofarAsAnswer;
350 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 338 of file XrdClXRootDMsgHandler.hh.

339 {
340 pRedirectAsAnswer = redirectAsAnswer;
341 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t redirectCounter)
inline

Set the redirect counter.

Definition at line 409 of file XrdClXRootDMsgHandler.hh.

410 {
411 pRedirectCounter = redirectCounter;
412 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool stateful)
inline

Definition at line 419 of file XrdClXRootDMsgHandler.hh.

420 {
421 pStateful = stateful;
422 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1096 of file XrdClXRootDMsgHandler.cc.

1097 {
1098 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1099 }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket * socket,
uint32_t & bytesWritten )
virtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 947 of file XrdClXRootDMsgHandler.cc.

949 {
950 //--------------------------------------------------------------------------
951 // First check if it is a PgWrite
952 //--------------------------------------------------------------------------
953 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
954 {
955 //------------------------------------------------------------------------
956 // PgWrite will have just one chunk
957 //------------------------------------------------------------------------
958 ChunkInfo chunk = pChunkList->front();
959 //------------------------------------------------------------------------
960 // Calculate the size of the first and last page (in case the chunk is not
961 // 4KB aligned)
962 //------------------------------------------------------------------------
963 int fLen = 0, lLen = 0;
964 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
965
966 //------------------------------------------------------------------------
967 // Set the crc32c buffer if not ready yet
968 //------------------------------------------------------------------------
969 if( pPgWrtCksumBuff.GetCursor() == 0 )
970 {
971 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
972 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
973 }
974
975 uint32_t btsLeft = chunk.length - pAsyncOffset;
976 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
977 if( pglen > btsLeft ) pglen = btsLeft;
978 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
979
980 while( btsLeft > 0 )
981 {
982 // first write the crc32c digest
983 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
984 {
985 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
986 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
987 int btswrt = 0;
988 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
989 if( !st.IsOK() ) return st;
990 bytesWritten += btswrt;
991 pPgWrtCksumBuff.AdvanceCursor( btswrt );
992 if( st.code == suRetry ) return st;
993 }
994 // then write the raw data (one page)
995 int btswrt = 0;
996 Status st = socket->Send( pgbuf, pglen, btswrt );
997 if( !st.IsOK() ) return st;
998 pgbuf += btswrt;
999 pglen -= btswrt;
1000 btsLeft -= btswrt;
1001 bytesWritten += btswrt;
1002 pAsyncOffset += btswrt; // update the offset to the raw data
1003 if( st.code == suRetry ) return st;
1004 // if we managed to write all the data ...
1005 if( pglen == 0 )
1006 {
1007 // move to the next page
1008 ++pPgWrtCurrentPageNb;
1009 if( pPgWrtCurrentPageNb < nbpgs )
1010 {
1011 // set the digest buffer
1012 pPgWrtCksumBuff.SetCursor( 0 );
1013 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1014 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1015 }
1016 // set the page length
1017 pglen = XrdSys::PageSize;
1018 if( pglen > btsLeft ) pglen = btsLeft;
1019 // reset offset in the current page
1020 pPgWrtCurrentPageOffset = 0;
1021 }
1022 else
1023 // otherwise just adjust the offset in the current page
1024 pPgWrtCurrentPageOffset += btswrt;
1025
1026 }
1027 }
1028 else if( !pChunkList->empty() )
1029 {
1030 size_t size = pChunkList->size();
1031 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1032 {
1033 char *buffer = (char*)(*pChunkList)[i].buffer;
1034 uint32_t size = (*pChunkList)[i].length;
1035 size_t leftToBeWritten = size - pAsyncOffset;
1036
1037 while( leftToBeWritten )
1038 {
1039 int btswrt = 0;
1040 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1041 bytesWritten += btswrt;
1042 if( !st.IsOK() || st.code == suRetry ) return st;
1043 pAsyncOffset += btswrt;
1044 leftToBeWritten -= btswrt;
1045 }
1046 //----------------------------------------------------------------------
1047 // Remember that we have moved to the next chunk, also clear the offset
1048 // within the buffer as we are going to move to a new one
1049 //----------------------------------------------------------------------
1050 ++pAsyncChunkIndex;
1051 pAsyncOffset = 0;
1052 }
1053 }
1054 else
1055 {
1056 Log *log = DefaultEnv::GetLog();
1057
1058 //------------------------------------------------------------------------
1059 // If the socket is encrypted we cannot use a kernel buffer, we have to
1060 // convert to user space buffer
1061 //------------------------------------------------------------------------
1062 if( socket->IsEncrypted() )
1063 {
1064 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1065 pUrl.GetHostId().c_str() );
1066
1067 char *ubuff = 0;
1068 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1069 if( ret < 0 ) return Status( stError, errInternal );
1070 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1071 return WriteMessageBody( socket, bytesWritten );
1072 }
1073
1074 //------------------------------------------------------------------------
1075 // Send the data
1076 //------------------------------------------------------------------------
1077 while( !pKBuff->Empty() )
1078 {
1079 int btswrt = 0;
1080 Status st = socket->Send( *pKBuff, btswrt );
1081 bytesWritten += btswrt;
1082 if( !st.IsOK() || st.code == suRetry ) return st;
1083 }
1084
1085 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1086 pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
1087 }
1088
1089 return Status();
1090 }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const uint16_t suRetry
const uint16_t errInternal
Internal error.
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, WriteMessageBody(), and XrdCl::XRootDMsg.

Referenced by WriteMessageBody().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 120 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: