16 #include <epicsTime.h> 17 #include <epicsThread.h> 18 #include <epicsVersion.h> 19 #include <epicsAtomic.h> 21 #include <pv/byteBuffer.h> 22 #include <pv/pvType.h> 26 #include <pv/likely.h> 28 #include <pv/pvaConstants.h> 29 #include <pv/remote.h> 30 #include <pv/security.h> 31 #include <pv/transportRegistry.h> 32 #include <pv/introspectionRegistry.h> 33 #include <pv/inetAddressUtil.h> 46 # if __cplusplus>=201103L
53 # if __cplusplus>=201103L
54 # define OVERRIDE override 73 AtomicValue() :val(0) {}
74 inline T getAndSet(T newval)
79 oldval = epics::atomic::get(val);
80 }
while(epics::atomic::compareAndSwap(val, oldval, newval)!=oldval);
84 return epics::atomic::get(val);
89 class AtomicValue<
bool>
91 AtomicValue<
int> realval;
93 inline bool getAndSet(
bool newval)
95 return this->realval.getAndSet(newval?1:0)!=0;
98 return !!
this->realval.get();
103 class io_exception:
public std::runtime_error {
105 explicit io_exception(
const std::string &s): std::runtime_error(s) {}
109 class invalid_data_stream_exception:
public std::runtime_error {
111 explicit invalid_data_stream_exception(
112 const std::string &s): std::runtime_error(s) {}
116 class connection_closed_exception:
public std::runtime_error {
118 explicit connection_closed_exception(
const std::string &s): std::runtime_error(s) {}
122 enum ReadMode { NORMAL, SPLIT, SEGMENTED };
124 enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL };
127 class epicsShareClass AbstractCodec :
128 public TransportSendControl,
133 static const std::size_t MAX_MESSAGE_PROCESS;
134 static const std::size_t MAX_MESSAGE_SEND;
135 static const std::size_t MAX_ENSURE_SIZE;
136 static const std::size_t MAX_ENSURE_DATA_SIZE;
137 static const std::size_t MAX_ENSURE_BUFFER_SIZE;
138 static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
142 size_t sendBufferSize,
143 size_t receiveBufferSize,
144 int32_t socketSendBufferSize,
145 bool blockingProcessQueue);
147 virtual void processControlMessage() = 0;
148 virtual void processApplicationMessage() = 0;
149 virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
150 virtual void invalidDataStreamHandler() = 0;
151 virtual void readPollOne()=0;
152 virtual void writePollOne() = 0;
153 virtual void scheduleSend() = 0;
154 virtual void sendCompleted() = 0;
155 virtual bool terminated() = 0;
156 virtual int write(epics::pvData::ByteBuffer* src) = 0;
157 virtual int read(epics::pvData::ByteBuffer* dst) = 0;
158 virtual bool isOpen() = 0;
161 virtual ~AbstractCodec()
165 virtual void alignBuffer(std::size_t alignment)
OVERRIDE FINAL;
167 virtual void alignData(std::size_t alignment)
OVERRIDE FINAL;
168 virtual void startMessage(
169 epics::pvData::int8 command,
170 std::size_t ensureCapacity = 0,
172 void putControlMessage(
173 epics::pvData::int8 command,
174 epics::pvData::int32 data);
178 virtual void flush(
bool lastMessageCompleted)
OVERRIDE FINAL;
181 void processSendQueue();
182 virtual void enqueueSendRequest(TransportSender::shared_pointer
const & sender)
OVERRIDE FINAL;
183 void enqueueSendRequest(TransportSender::shared_pointer
const & sender,
184 std::size_t requiredBufferSize);
185 void setSenderThread();
186 virtual void setRecipient(osiSockAddr
const & sendTo)
OVERRIDE FINAL;
189 static std::size_t alignedValue(std::size_t value, std::size_t alignment);
191 virtual bool directSerialize(
192 epics::pvData::ByteBuffer * ,
194 std::size_t , std::size_t )
OVERRIDE;
197 virtual bool directDeserialize(epics::pvData::ByteBuffer * ,
199 std::size_t , std::size_t )
OVERRIDE;
201 bool sendQueueEmpty()
const {
202 return _sendQueue.empty();
205 epics::pvData::int8 getRevision()
const {
206 epicsGuard<epicsMutex> G(_mutex);
207 int8_t myver = _clientServerFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;
208 return myver < _version ? myver : _version;
213 virtual void sendBufferFull(
int tries) = 0;
214 void send(epics::pvData::ByteBuffer *buffer);
215 void flushSendBuffer();
217 virtual void setRxTimeout(
bool ena) {}
223 int32_t _payloadSize;
224 epics::pvData::int32 _remoteTransportSocketReceiveBufferSize;
227 epicsThreadId _senderThread;
228 WriteMode _writeMode;
231 epics::pvData::ByteBuffer _socketBuffer;
232 epics::pvData::ByteBuffer _sendBuffer;
234 fair_queue<TransportSender> _sendQueue;
238 void processHeader();
239 void processReadNormal();
240 void postProcessApplicationMessage();
241 void processReadSegmented();
242 bool readToBuffer(std::size_t requiredBytes,
bool persistent);
243 void endMessage(
bool hasMoreSegments);
245 epics::pvAccess::TransportSender::shared_pointer
const & sender);
247 std::size_t _storedPayloadSize;
248 std::size_t _storedPosition;
249 std::size_t _storedLimit;
250 std::size_t _startPosition;
252 const std::size_t _maxSendPayloadSize;
253 std::size_t _lastMessageStartPosition;
254 std::size_t _lastSegmentedMessageType;
255 int8_t _lastSegmentedMessageCommand;
256 std::size_t _nextMessagePayloadOffset;
258 epics::pvData::int8 _byteOrderFlag;
260 const epics::pvData::int8 _clientServerFlag;
264 mutable epics::pvData::Mutex _mutex;
268 class BlockingTCPTransportCodec:
269 public AbstractCodec,
270 public AuthenticationPluginControl,
271 public std::tr1::enable_shared_from_this<BlockingTCPTransportCodec>
276 POINTER_DEFINITIONS(BlockingTCPTransportCodec);
278 static size_t num_instances;
280 BlockingTCPTransportCodec(
282 Context::shared_pointer
const & context,
284 ResponseHandler::shared_pointer
const & responseHandler,
285 size_t sendBufferSize,
286 size_t receiveBufferSize,
287 epics::pvData::int16 priority);
288 virtual ~BlockingTCPTransportCodec();
298 virtual void start();
300 virtual int read(epics::pvData::ByteBuffer* dst)
OVERRIDE FINAL;
301 virtual int write(epics::pvData::ByteBuffer* src)
OVERRIDE FINAL;
302 virtual const osiSockAddr* getLastReadBufferSocketAddress()
OVERRIDE FINAL {
303 return &_socketAddress;
308 return std::string(
"tcp");
312 if (_command == CMD_SET_ENDIANESS)
315 setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
321 _responseHandler->handleResponse(&_socketAddress, shared_from_this(),
322 _version, _command, _payloadSize, &_socketBuffer);
326 virtual const osiSockAddr& getRemoteAddress()
const OVERRIDE FINAL {
327 return _socketAddress;
330 virtual const std::string& getRemoteName()
const OVERRIDE FINAL {
335 virtual std::size_t getReceiveBufferSize()
const OVERRIDE FINAL {
336 return _socketBuffer.getSize();
340 virtual epics::pvData::int16 getPriority()
const OVERRIDE FINAL {
345 virtual void setRemoteTransportReceiveBufferSize(
346 std::size_t remoteTransportReceiveBufferSize)
OVERRIDE FINAL {
347 _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
351 virtual void setRemoteTransportSocketReceiveBufferSize(
353 _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
357 std::tr1::shared_ptr<
const epics::pvData::Field>
358 virtual cachedDeserialize(epics::pvData::ByteBuffer* buffer)
OVERRIDE FINAL 360 return _incomingIR.deserialize(buffer,
this);
364 virtual void cachedSerialize(
365 const std::tr1::shared_ptr<
const epics::pvData::Field>& field,
368 _outgoingIR.serialize(field, buffer,
this);
381 Transport::shared_pointer thisSharedPtr = shared_from_this();
382 _context->getTransportRegistry()->install(thisSharedPtr);
387 virtual bool verify(epics::pvData::int32 timeoutMs)
OVERRIDE;
389 virtual void verified(epics::pvData::Status
const & status)
OVERRIDE;
391 virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer
const & data)
OVERRIDE FINAL;
393 virtual void sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer
const & data)
OVERRIDE FINAL;
396 void receiveThread();
408 virtual void internalClose();
411 AtomicValue<
bool> _isOpen;
412 epics::pvData::Thread _readThread, _sendThread;
413 const SOCKET _channel;
415 osiSockAddr _socketAddress;
416 std::string _socketName;
418 Context::shared_pointer _context;
420 IntrospectionRegistry _incomingIR;
421 IntrospectionRegistry _outgoingIR;
424 std::string _authSessionName;
425 AuthenticationSession::shared_pointer _authSession;
428 PeerInfo::const_shared_pointer _peerInfo;
432 ResponseHandler::shared_pointer _responseHandler;
433 size_t _remoteTransportReceiveBufferSize;
434 epics::pvData::int16 _priority;
438 epics::pvData::Event _verifiedEvent;
441 class BlockingServerTCPTransportCodec :
442 public BlockingTCPTransportCodec,
443 public TransportSender {
446 POINTER_DEFINITIONS(BlockingServerTCPTransportCodec);
449 BlockingServerTCPTransportCodec(
450 Context::shared_pointer
const & context,
452 ResponseHandler::shared_pointer
const & responseHandler,
453 int32_t sendBufferSize,
454 int32_t receiveBufferSize );
457 static shared_pointer create(
458 Context::shared_pointer
const & context,
460 ResponseHandler::shared_pointer
const & responseHandler,
462 int receiveBufferSize)
464 shared_pointer thisPointer(
465 new BlockingServerTCPTransportCodec(
466 context, channel, responseHandler,
467 sendBufferSize, receiveBufferSize)
469 thisPointer->activate();
475 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl>
const & )
OVERRIDE FINAL 482 pvAccessID preallocateChannelSID();
484 void depreallocateChannelSID(pvAccessID ) {}
486 void registerChannel(
488 std::tr1::shared_ptr<ServerChannel>
const & channel);
490 void unregisterChannel(pvAccessID sid);
492 std::tr1::shared_ptr<ServerChannel> getChannel(pvAccessID sid);
494 void getChannels(std::vector<std::tr1::shared_ptr<ServerChannel> >& channels)
const;
496 size_t getChannelCount()
const;
498 virtual bool verify(epics::pvData::int32 timeoutMs)
OVERRIDE FINAL {
500 TransportSender::shared_pointer transportSender =
501 std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
502 enqueueSendRequest(transportSender);
504 bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs);
506 enqueueSendRequest(transportSender);
508 return verifiedStatus;
511 virtual void verified(epics::pvData::Status
const & status)
OVERRIDE FINAL {
513 epicsGuard<epicsMutex> G(_mutex);
514 _verificationStatus = status;
516 BlockingTCPTransportCodec::verified(status);
519 void authNZInitialize(
const std::string& securityPluginName,
520 const epics::pvData::PVStructure::shared_pointer& data);
522 virtual void authenticationCompleted(epics::pvData::Status
const & status,
525 virtual void send(epics::pvData::ByteBuffer* buffer,
532 void destroyAllChannels();
540 pvAccessID _lastChannelSID;
542 typedef std::map<pvAccessID, std::tr1::shared_ptr<ServerChannel> > _channels_t;
546 _channels_t _channels;
548 mutable epics::pvData::Mutex _channelsMutex;
550 epics::pvData::Status _verificationStatus;
552 bool _verifyOrVerified;
554 std::vector<std::string> advertisedAuthPlugins;
558 class BlockingClientTCPTransportCodec :
559 public BlockingTCPTransportCodec,
560 public TransportSender,
561 public epics::pvData::TimerCallback {
564 POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
567 BlockingClientTCPTransportCodec(
568 Context::shared_pointer
const & context,
570 ResponseHandler::shared_pointer
const & responseHandler,
571 int32_t sendBufferSize,
572 int32_t receiveBufferSize,
573 std::tr1::shared_ptr<ClientChannelImpl>
const & client,
574 epics::pvData::int8 remoteTransportRevision,
575 float heartbeatInterval,
579 static shared_pointer create(
580 Context::shared_pointer
const & context,
582 ResponseHandler::shared_pointer
const & responseHandler,
583 int32_t sendBufferSize,
584 int32_t receiveBufferSize,
585 std::tr1::shared_ptr<ClientChannelImpl>
const & client,
586 int8_t remoteTransportRevision,
587 float heartbeatInterval,
590 shared_pointer thisPointer(
591 new BlockingClientTCPTransportCodec(
592 context, channel, responseHandler,
593 sendBufferSize, receiveBufferSize,
594 client, remoteTransportRevision,
595 heartbeatInterval, priority)
597 thisPointer->activate();
613 virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl>
const & client)
OVERRIDE FINAL;
617 virtual void send(epics::pvData::ByteBuffer* buffer,
620 void authNZInitialize(
const std::vector<std::string>& offeredSecurityPlugins);
622 virtual void authenticationCompleted(epics::pvData::Status
const & status,
625 virtual void verified(epics::pvData::Status
const & status)
OVERRIDE FINAL;
636 typedef std::map<pvAccessID, std::tr1::weak_ptr<ClientChannelImpl> > TransportClientMap_t;
637 TransportClientMap_t _owners;
642 const double _connectionTimeout;
652 void closedNotifyClients();
virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const &data)=0
Pass data to the active security plug-in session.
#define FINAL
Copyright - See the COPYRIGHT that is included with this distribution.