pvAccessCPP  7.1.1
blockingUDP.h
1 /**
2  * Copyright - See the COPYRIGHT that is included with this distribution.
3  * pvAccessCPP is distributed subject to a Software License Agreement found
4  * in file LICENSE that is included with this distribution.
5  */
6 
7 #ifndef BLOCKINGUDP_H_
8 #define BLOCKINGUDP_H_
9 
10 #ifdef epicsExportSharedSymbols
11 # define blockingUDPEpicsExportSharedSymbols
12 # undef epicsExportSharedSymbols
13 #endif
14 
15 #include <shareLib.h>
16 #include <osiSock.h>
17 #include <epicsThread.h>
18 
19 #include <pv/noDefaultMethods.h>
20 #include <pv/byteBuffer.h>
21 #include <pv/lock.h>
22 #include <pv/event.h>
23 #include <pv/pvIntrospect.h>
24 
25 #ifdef blockingUDPEpicsExportSharedSymbols
26 # define epicsExportSharedSymbols
27 # undef blockingUDPEpicsExportSharedSymbols
28 #endif
29 
30 #include <shareLib.h>
31 
32 #include <pv/remote.h>
33 #include <pv/pvaConstants.h>
34 #include <pv/inetAddressUtil.h>
35 
36 namespace epics {
37 namespace pvAccess {
38 
39 class ClientChannelImpl;
40 class BlockingUDPConnector;
41 
42 enum InetAddressType { inetAddressType_all, inetAddressType_unicast, inetAddressType_broadcast_multicast };
43 
44 class BlockingUDPTransport :
45  public Transport,
46  public TransportSendControl,
47  public epicsThreadRunable
48 {
49  EPICS_NOT_COPYABLE(BlockingUDPTransport)
50 public:
51  POINTER_DEFINITIONS(BlockingUDPTransport);
52 
53  static size_t num_instances;
54 
55 private:
56  std::tr1::weak_ptr<BlockingUDPTransport> internal_this;
57  friend class BlockingUDPConnector;
58  BlockingUDPTransport(bool serverFlag,
59  ResponseHandler::shared_pointer const & responseHandler,
60  SOCKET channel, osiSockAddr &bindAddress,
61  short remoteTransportRevision);
62 public:
63 
64  virtual ~BlockingUDPTransport();
65 
66  virtual bool isClosed() OVERRIDE FINAL {
67  return _closed.get();
68  }
69 
70  virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
71  return _remoteAddress;
72  }
73 
74  virtual const std::string& getRemoteName() const OVERRIDE FINAL {
75  return _remoteName;
76  }
77 
78  virtual std::string getType() const OVERRIDE FINAL {
79  return std::string("udp");
80  }
81 
82  virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
83  return _receiveBuffer.getSize();
84  }
85 
86  virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL {
87  return PVA_DEFAULT_PRIORITY;
88  }
89 
90  virtual void setRemoteTransportReceiveBufferSize(
91  std::size_t /*receiveBufferSize*/) OVERRIDE FINAL {
92  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
93  }
94 
95  virtual void setRemoteTransportSocketReceiveBufferSize(
96  std::size_t /*socketReceiveBufferSize*/) OVERRIDE FINAL {
97  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
98  }
99 
100  virtual bool verify(epics::pvData::int32 /*timeoutMs*/) OVERRIDE FINAL {
101  // noop
102  return true;
103  }
104 
105  virtual void verified(epics::pvData::Status const & /*status*/) OVERRIDE FINAL {
106  // noop
107  }
108 
109  virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) OVERRIDE FINAL {
110  // noop
111  }
112 
113  // NOTE: this is not yet used for UDP
114  virtual void setByteOrder(int byteOrder) OVERRIDE FINAL {
115  // called from receive thread... or before processing
116  _receiveBuffer.setEndianess(byteOrder);
117 
118  // sync?!
119  _sendBuffer.setEndianess(byteOrder);
120  }
121 
122  virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
123 
124  virtual void flushSendQueue() OVERRIDE FINAL;
125 
126  void start();
127 
128  virtual void close() OVERRIDE FINAL;
129 
130  virtual void ensureData(std::size_t size) OVERRIDE FINAL;
131 
132  virtual void alignData(std::size_t alignment) OVERRIDE FINAL {
133  _receiveBuffer.align(alignment);
134  }
135 
136  virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/,
137  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
138  {
139  return false;
140  }
141 
142  virtual bool directDeserialize(epics::pvData::ByteBuffer* /*existingBuffer*/, char* /*deserializeTo*/,
143  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
144  {
145  return false;
146  }
147 
148  virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
149  virtual void endMessage() OVERRIDE FINAL;
150 
151  virtual void flush(bool /*lastMessageCompleted*/) OVERRIDE FINAL {
152  // noop since all UDP requests are sent immediately
153  }
154 
155  virtual void setRecipient(const osiSockAddr& sendTo) OVERRIDE FINAL {
156  _sendToEnabled = true;
157  _sendTo = sendTo;
158  }
159 
160  void setLocalMulticastAddress(const osiSockAddr& sendTo) {
161  _localMulticastAddressEnabled = true;
162  _localMulticastAddress = sendTo;
163  }
164 
165  bool hasLocalMulticastAddress() const {
166  return _localMulticastAddressEnabled;
167  }
168 
169  const osiSockAddr& getLocalMulticastAddress() const {
170  return _localMulticastAddress;
171  }
172 
173  virtual void flushSerializeBuffer() OVERRIDE FINAL {
174  // noop
175  }
176 
177  virtual void ensureBuffer(std::size_t /*size*/) OVERRIDE FINAL {
178  // noop
179  }
180 
181  virtual void alignBuffer(std::size_t alignment) OVERRIDE FINAL {
182  _sendBuffer.align(alignment);
183  }
184 
185  virtual void cachedSerialize(
186  const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
187  {
188  // no cache
189  field->serialize(buffer, this);
190  }
191 
192  virtual std::tr1::shared_ptr<const epics::pvData::Field>
193  cachedDeserialize(epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
194  {
195  // no cache
196  // TODO
197  return epics::pvData::getFieldCreate()->deserialize(buffer, this);
198  }
199 
200  virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & /*client*/) OVERRIDE FINAL
201  {
202  return false;
203  }
204 
205  virtual void release(pvAccessID /*clientId*/) OVERRIDE FINAL {}
206 
207  /**
208  * Set ignore list.
209  * @param address list of ignored addresses.
210  */
211  void setIgnoredAddresses(const InetAddrVector& addresses) {
212  _ignoredAddresses = addresses;
213  }
214 
215  /**
216  * Get list of ignored addresses.
217  * @return ignored addresses.
218  */
219  const InetAddrVector& getIgnoredAddresses() const {
220  return _ignoredAddresses;
221  }
222 
223  /**
224  * Set tapped NIF list.
225  * @param NIF address list to tap.
226  */
227  void setTappedNIF(const InetAddrVector& addresses) {
228  _tappedNIF = addresses;
229  }
230 
231  /**
232  * Get list of tapped NIF addresses.
233  * @return tapped NIF addresses.
234  */
235  const InetAddrVector& getTappedNIF() const {
236  return _tappedNIF;
237  }
238 
239  bool send(const char* buffer, size_t length, const osiSockAddr& address);
240 
241  bool send(epics::pvData::ByteBuffer* buffer, const osiSockAddr& address);
242 
243  bool send(epics::pvData::ByteBuffer* buffer, InetAddressType target = inetAddressType_all);
244 
245  /**
246  * Get list of send addresses.
247  * @return send addresses.
248  */
249  const InetAddrVector& getSendAddresses() {
250  return _sendAddresses;
251  }
252 
253  /**
254  * Get bind address.
255  * @return bind address.
256  */
257  const osiSockAddr* getBindAddress() const {
258  return &_bindAddress;
259  }
260 
261  bool isBroadcastAddress(const osiSockAddr* address, const InetAddrVector& broadcastAddresses)
262  {
263  for (size_t i = 0; i < broadcastAddresses.size(); i++)
264  if (broadcastAddresses[i].ia.sin_addr.s_addr == address->ia.sin_addr.s_addr)
265  return true;
266  return false;
267  }
268 
269  // consumes arguments
270  void setSendAddresses(InetAddrVector& addresses, std::vector<bool>& address_types) {
271  _sendAddresses.swap(addresses);
272  _isSendAddressUnicast.swap(address_types);
273  }
274 
275  void join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr);
276 
277  void setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback);
278 
279 protected:
280  AtomicBoolean _closed;
281 
282  /**
283  * Response handler.
284  */
285  ResponseHandler::shared_pointer _responseHandler;
286 
287  virtual void run() OVERRIDE FINAL;
288 
289 private:
290  bool processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer);
291 
292  void close(bool waitForThreadToComplete);
293 
294  // Context only used for logging in this class
295 
296  /**
297  * Corresponding channel.
298  */
299  SOCKET _channel;
300 
301  /* When provided, this transport is used for replies (passed to handler)
302  * instead of *this. This feature is used in the situation where broadcast
303  * traffic is received on one socket, but a different socket must be used
304  * for unicast replies.
305  *
306  Transport::shared_pointer _replyTransport;
307  */
308 
309  /**
310  * Bind address.
311  */
312  osiSockAddr _bindAddress;
313 
314  /**
315  * Remote address.
316  */
317  osiSockAddr _remoteAddress;
318  std::string _remoteName;
319 
320  /**
321  * Send addresses.
322  */
323  InetAddrVector _sendAddresses;
324 
325  std::vector<bool> _isSendAddressUnicast;
326 
327  /**
328  * Ignore addresses.
329  */
330  InetAddrVector _ignoredAddresses;
331 
332  /**
333  * Tapped NIF addresses.
334  */
335  InetAddrVector _tappedNIF;
336 
337  /**
338  * Send address.
339  */
340  osiSockAddr _sendTo;
341  bool _sendToEnabled;
342 
343  /**
344  * Local multicast address.
345  */
346  osiSockAddr _localMulticastAddress;
347  bool _localMulticastAddressEnabled;
348 
349  /**
350  * Receive buffer.
351  */
352  epics::pvData::ByteBuffer _receiveBuffer;
353 
354  /**
355  * Send buffer.
356  */
357  epics::pvData::ByteBuffer _sendBuffer;
358 
359  /**
360  * Last message start position.
361  */
362  int _lastMessageStartPosition;
363 
364  /**
365  * Used for process sync.
366  */
367  epics::pvData::Mutex _mutex;
368  epics::pvData::Mutex _sendMutex;
369 
370  /**
371  * Thread ID
372  */
373  epics::auto_ptr<epicsThread> _thread;
374 
375  epics::pvData::int8 _clientServerWithEndianFlag;
376 
377 };
378 
379 class BlockingUDPConnector{
380 public:
381  POINTER_DEFINITIONS(BlockingUDPConnector);
382 
383  BlockingUDPConnector(bool serverFlag) :_serverFlag(serverFlag) {}
384 
385  /**
386  * NOTE: transport client is ignored for broadcast (UDP).
387  */
388  BlockingUDPTransport::shared_pointer connect(
389  ResponseHandler::shared_pointer const & responseHandler,
390  osiSockAddr& bindAddress,
391  epics::pvData::int8 transportRevision);
392 
393 private:
394 
395  /**
396  * Client/server flag.
397  */
398  bool _serverFlag;
399 
400  EPICS_NOT_COPYABLE(BlockingUDPConnector)
401 };
402 
403 typedef std::vector<BlockingUDPTransport::shared_pointer> BlockingUDPTransportVector;
404 
405 void initializeUDPTransports(
406  bool serverFlag,
407  BlockingUDPTransportVector& udpTransports,
408  const IfaceNodeVector& ifaceList,
409  const ResponseHandler::shared_pointer& responseHandler,
410  BlockingUDPTransport::shared_pointer& sendTransport,
411  epics::pvData::int32& listenPort,
412  bool autoAddressList,
413  const std::string& addressList,
414  const std::string& ignoreAddressList);
415 
416 
417 }
418 }
419 
420 #endif /* BLOCKINGUDP_H_ */
virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const &data)=0
Pass data to the active security plug-in session.
#define OVERRIDE
Definition: pvAccess.h:55
#define FINAL
Copyright - See the COPYRIGHT that is included with this distribution.
Definition: pvAccess.h:48