XRootD
Loading...
Searching...
No Matches
XrdClAsyncMsgWriter.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
20#define SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
21
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClStream.hh"
28#include "XrdSys/XrdSysE2T.hh"
29
30#include <memory>
31
32namespace XrdCl
33{
34 //----------------------------------------------------------------------------
36 //----------------------------------------------------------------------------
38 {
39 public:
40 //------------------------------------------------------------------------
48 //------------------------------------------------------------------------
50 Socket &socket,
51 const std::string &strmname,
52 Stream &strm,
53 uint16_t substrmnb,
54 AnyObject &chdata ) : writestage( WriteStart ),
55 xrdTransport( xrdTransport ),
56 socket( socket ),
57 strmname( strmname ),
58 strm( strm ),
59 substrmnb( substrmnb ),
60 chdata( chdata ),
61 outmsg( nullptr ),
62 outmsgsize( 0 ),
63 outhandler( nullptr )
64 {
65 }
66
67 //------------------------------------------------------------------------
69 //------------------------------------------------------------------------
70 inline void Reset()
71 {
72 writestage = WriteStart;
73 outmsg = nullptr;
74 outmsgsize = 0;;
75 outhandler = nullptr;
76 outsign.reset();
77 }
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 {
84 Log *log = DefaultEnv::GetLog();
85 while( true )
86 {
87 switch( writestage )
88 {
89 //------------------------------------------------------------------
90 // Pick up a message if we're not in process of writing something
91 //------------------------------------------------------------------
92 case WriteStart:
93 {
94 std::pair<Message *, MsgHandler *> toBeSent;
95 toBeSent = strm.OnReadyToWrite( substrmnb );
96 outmsg = toBeSent.first;
97 outhandler = toBeSent.second;
98 if( !outmsg ) return XRootDStatus( stOK, suAlreadyDone );
99
100 outmsg->SetCursor( 0 );
101 outmsgsize = outmsg->GetSize();
102
103 //----------------------------------------------------------------
104 // Secure the message if necessary
105 //----------------------------------------------------------------
106 Message *signature = nullptr;
107 XRootDStatus st = xrdTransport.GetSignature( outmsg, signature, chdata );
108 if( !st.IsOK() ) return st;
109 outsign.reset( signature );
110
111 if( outsign )
112 outmsgsize += outsign->GetSize();
113
114 //----------------------------------------------------------------
115 // The next step is to write the signature
116 //----------------------------------------------------------------
117 writestage = WriteSign;
118 continue;
119 }
120 //------------------------------------------------------------------
121 // First write the signature (if there is one)
122 //------------------------------------------------------------------
123 case WriteSign:
124 {
125 //----------------------------------------------------------------
126 // If there is a signature for the request send it over the socket
127 //----------------------------------------------------------------
128 if( outsign )
129 {
130 XRootDStatus st = socket.Send( *outsign, strmname );
131 if( !st.IsOK() || st.code == suRetry ) return st;
132 }
133 //----------------------------------------------------------------
134 // The next step is to write the signature
135 //----------------------------------------------------------------
136 writestage = WriteRequest;
137 continue;
138 }
139 //------------------------------------------------------------------
140 // Then write the request itself
141 //------------------------------------------------------------------
142 case WriteRequest:
143 {
144 XRootDStatus st = socket.Send( *outmsg, strmname );
145 if( !st.IsOK() || st.code == suRetry ) return st;
146 //----------------------------------------------------------------
147 // The next step is to write the signature
148 //----------------------------------------------------------------
149 writestage = WriteRawData;
150 continue;
151 }
152 //------------------------------------------------------------------
153 // And then write the raw data (if any)
154 //------------------------------------------------------------------
155 case WriteRawData:
156 {
157 if( outhandler->IsRaw() )
158 {
159 uint32_t wrtcnt = 0;
160 XRootDStatus st = outhandler->WriteMessageBody( &socket, wrtcnt );
161 if( !st.IsOK() || st.code == suRetry ) return st;
162 outmsgsize += wrtcnt;
163 log->Dump( AsyncSockMsg, "[%s] Wrote %d bytes of raw data of message"
164 "(0x%x) body.", strmname.c_str(), wrtcnt, outmsg );
165 }
166 //----------------------------------------------------------------
167 // The next step is to finalize the write operation
168 //----------------------------------------------------------------
169 writestage = WriteDone;
170 continue;
171 }
172 //------------------------------------------------------------------
173 // Finally, finalize the write operation
174 //------------------------------------------------------------------
175 case WriteDone:
176 {
177 XRootDStatus st = socket.Flash();
178 if( !st.IsOK() )
179 {
180 log->Error( AsyncSockMsg, "[%s] Unable to flash the socket: %s",
181 strmname.c_str(), XrdSysE2T( st.errNo ) );
182 return st;
183 }
184
185 log->Dump( AsyncSockMsg, "[%s] Successfully sent message: %s (0x%x).",
186 strmname.c_str(), outmsg->GetObfuscatedDescription().c_str(), outmsg );
187
188 strm.OnMessageSent( substrmnb, outmsg, outmsgsize );
189 return XRootDStatus();
190 }
191 }
192 // just in case ...
193 break;
194 }
195 //----------------------------------------------------------------------
196 // We are done
197 //----------------------------------------------------------------------
198 return XRootDStatus();
199 }
200
201 private:
202
203 //------------------------------------------------------------------------
205 //------------------------------------------------------------------------
206 enum Stage
207 {
208 WriteStart, //< the next step is to initialize the read
209 WriteSign, //< the next step is to write the signature
210 WriteRequest, //< the next step is to write the request
211 WriteRawData, //< the next step is to write the raw data
212 WriteDone //< the next step is to finalize the write
213 };
214
215 //------------------------------------------------------------------------
216 // Current read stage
217 //------------------------------------------------------------------------
218 Stage writestage;
219
220 //------------------------------------------------------------------------
221 // The context of the read operation
222 //------------------------------------------------------------------------
223 TransportHandler &xrdTransport;
224 Socket &socket;
225 const std::string &strmname;
226 Stream &strm;
227 uint16_t substrmnb;
228 AnyObject &chdata;
229
230 //------------------------------------------------------------------------
231 // The internal state of the the reader
232 //------------------------------------------------------------------------
233 Message *outmsg; //< we don't own the message
234 uint32_t outmsgsize;
235 MsgHandler *outhandler;
236 std::unique_ptr<Message> outsign;
237 };
238
239}
240
241#endif /* SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_ */
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
Utility class encapsulating writing request logic.
XRootDStatus Write()
Write the request into the socket.
AsyncMsgWriter(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb, AnyObject &chdata)
void Reset()
Reset the state of the object (makes it ready to read out next msg)
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetSize() const
Get the size of the message.
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
virtual XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
A network socket.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
XRootDStatus Flash()
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Perform the handshake and the authentication for each physical stream.
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)=0
Get signature for given message.
const uint16_t suRetry
const uint16_t stOK
Everything went OK.
const uint64_t AsyncSockMsg
const uint16_t suAlreadyDone
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
uint32_t errNo
Errno, if any.