blob: a4f6226cebc581938de1a009f4229a30710ba348 [file] [log] [blame]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
* Copyright 2018 (c) Jose Cabral, fortiss GmbH
*/
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/pubsub_udp.h>
#include <open62541/util.h>
// UDP multicast network layer specific internal data
typedef struct {
int ai_family; //Protocol family for socket. IPv4/IPv6
struct sockaddr_storage *ai_addr; //https://msdn.microsoft.com/de-de/library/windows/desktop/ms740496(v=vs.85).aspx
UA_UInt32 messageTTL;
UA_Boolean enableLoopback;
UA_Boolean enableReuse;
} UA_PubSubChannelDataUDPMC;
/**
* Open communication socket based on the connectionConfig. Protocol specific parameters are
* provided within the connectionConfig as KeyValuePair.
* Currently supported options: "ttl" , "loopback", "reuse"
*
* @return ref to created channel, NULL on error
*/
static UA_PubSubChannel *
UA_PubSubChannelUDPMC_open(const UA_PubSubConnectionConfig *connectionConfig) {
UA_initialize_architecture_network();
UA_NetworkAddressUrlDataType address;
if(UA_Variant_hasScalarType(&connectionConfig->address, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){
address = *(UA_NetworkAddressUrlDataType *)connectionConfig->address.data;
} else {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Invalid Address.");
return NULL;
}
//allocate and init memory for the UDP multicast specific internal data
UA_PubSubChannelDataUDPMC * channelDataUDPMC =
(UA_PubSubChannelDataUDPMC *) UA_calloc(1, (sizeof(UA_PubSubChannelDataUDPMC)));
if(!channelDataUDPMC){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
return NULL;
}
//set default values
UA_PubSubChannelDataUDPMC defaultValues = {0, NULL, 255, UA_TRUE, UA_TRUE};
memcpy(channelDataUDPMC, &defaultValues, sizeof(UA_PubSubChannelDataUDPMC));
//iterate over the given KeyValuePair paramters
UA_String ttlParam = UA_STRING("ttl"), loopbackParam = UA_STRING("loopback"), reuseParam = UA_STRING("reuse");
for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &ttlParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){
channelDataUDPMC->messageTTL = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data;
}
} else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &loopbackParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
channelDataUDPMC->enableLoopback = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
}
} else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &reuseParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
channelDataUDPMC->enableReuse = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
}
} else {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation. Unknown connection parameter.");
}
}
UA_PubSubChannel *newChannel = (UA_PubSubChannel *) UA_calloc(1, sizeof(UA_PubSubChannel));
if(!newChannel){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
UA_free(channelDataUDPMC);
return NULL;
}
struct addrinfo hints, *rp, *requestResult = NULL;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
UA_String hostname, path;
UA_UInt16 networkPort;
if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Invalid URL.");
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
if(hostname.length > 512) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. URL maximum length is 512.");
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
UA_STACKARRAY(char, addressAsChar, sizeof(char) * hostname.length +1);
memcpy(addressAsChar, hostname.data, hostname.length);
addressAsChar[hostname.length] = 0;
char port[6];
sprintf(port, "%u", networkPort);
if(UA_getaddrinfo(addressAsChar, port, &hints, &requestResult) != 0) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Internal error.");
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
//check if the ip address is a multicast address
if(requestResult->ai_family == PF_INET){
struct in_addr imr_interface;
UA_inet_pton(AF_INET, addressAsChar, &imr_interface);
if((UA_ntohl(imr_interface.s_addr) & 0xF0000000) != 0xE0000000){
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. No multicast address.");
}
} else {
//TODO check if ipv6 addrr is multicast address.
}
for(rp = requestResult; rp != NULL; rp = rp->ai_next){
newChannel->sockfd = UA_socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if(newChannel->sockfd != UA_INVALID_SOCKET){
break; /*success*/
}
}
if(!rp){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Internal error.");
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
channelDataUDPMC->ai_family = rp->ai_family;
channelDataUDPMC->ai_addr = (struct sockaddr_storage *) UA_calloc(1, sizeof(struct sockaddr_storage));
if(!channelDataUDPMC->ai_addr){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Out of memory.");
UA_close(newChannel->sockfd);
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
memcpy(channelDataUDPMC->ai_addr, rp->ai_addr, sizeof(*rp->ai_addr));
//link channel and internal channel data
newChannel->handle = channelDataUDPMC;
//Set loop back data to your host
#if UA_IPV6
if(UA_setsockopt(newChannel->sockfd,
requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_LOOP : IP_MULTICAST_LOOP,
(const char *)&channelDataUDPMC->enableLoopback, sizeof (channelDataUDPMC->enableLoopback))
#else
if(UA_setsockopt(newChannel->sockfd,
IPPROTO_IP,
IP_MULTICAST_LOOP,
(const char *)&channelDataUDPMC->enableLoopback, sizeof (channelDataUDPMC->enableLoopback))
#endif
< 0) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Loopback setup failed.");
UA_close(newChannel->sockfd);
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
//Set Time to live (TTL). Value of 1 prevent forward beyond the local network.
#if UA_IPV6
if(UA_setsockopt(newChannel->sockfd,
requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL,
(const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL))
#else
if(UA_setsockopt(newChannel->sockfd,
IPPROTO_IP,
IP_MULTICAST_TTL,
(const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL))
#endif
< 0) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. Time to live setup failed.");
}
//Set reuse address -> enables sharing of the same listening address on different sockets.
if(channelDataUDPMC->enableReuse){
int enableReuse = 1;
if(UA_setsockopt(newChannel->sockfd,
SOL_SOCKET, SO_REUSEADDR,
(const char*)&enableReuse, sizeof(enableReuse)) < 0){
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. Reuse address setup failed.");
}
}
//Set the physical interface for outgoing traffic
if(address.networkInterface.length > 0){
UA_STACKARRAY(char, interfaceAsChar, sizeof(char) * address.networkInterface.length + 1);
memcpy(interfaceAsChar, address.networkInterface.data, address.networkInterface.length);
interfaceAsChar[address.networkInterface.length] = 0;
enum{
IPv4,
#if UA_IPV6
IPv6,
#endif
INVALID
} ipVersion;
union {
struct ip_mreq ipv4;
#if UA_IPV6
struct ipv6_mreq ipv6;
#endif
} group;
if(UA_inet_pton(AF_INET, interfaceAsChar, &group.ipv4.imr_interface)){
ipVersion = IPv4;
#if UA_IPV6
} else if (UA_inet_pton(AF_INET6, interfaceAsChar, &group.ipv6.ipv6mr_multiaddr)){
group.ipv6.ipv6mr_interface = UA_if_nametoindex(interfaceAsChar);
ipVersion = IPv6;
#endif
} else {
ipVersion = INVALID;
}
if(ipVersion == INVALID ||
#if UA_IPV6
UA_setsockopt(newChannel->sockfd,
requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_IF : IP_MULTICAST_IF,
ipVersion == IPv6 ? (const void *) &group.ipv6.ipv6mr_interface : &group.ipv4.imr_interface,
ipVersion == IPv6 ? sizeof(group.ipv6.ipv6mr_interface) : sizeof(struct in_addr))
#else
UA_setsockopt(newChannel->sockfd,
IPPROTO_IP,
IP_MULTICAST_IF,
&group.ipv4.imr_interface,
sizeof(struct in_addr))
#endif
< 0) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. Interface selection failed.");
};
}
UA_freeaddrinfo(requestResult);
newChannel->state = UA_PUBSUB_CHANNEL_PUB;
return newChannel;
}
/**
* Subscribe to a given address.
*
* @return UA_STATUSCODE_GOOD on success
*/
static UA_StatusCode
UA_PubSubChannelUDPMC_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings,
void (*notUsedHere)(UA_ByteString *encodedBuffer, UA_ByteString *topic)) {
if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_RDY)){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_PubSubChannelDataUDPMC * connectionConfig = (UA_PubSubChannelDataUDPMC *) channel->handle;
if(connectionConfig->ai_family == PF_INET){//IPv4 handling
struct sockaddr_in addr;
memcpy(&addr, connectionConfig->ai_addr, sizeof(struct sockaddr_in));
addr.sin_addr.s_addr = INADDR_ANY;
if (UA_bind(channel->sockfd, (const struct sockaddr *)&addr, sizeof(struct sockaddr_in)) != 0){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
struct ip_mreq groupV4;
memcpy(&groupV4.imr_multiaddr, &((const struct sockaddr_in *)connectionConfig->ai_addr)->sin_addr, sizeof(struct ip_mreq));
groupV4.imr_interface.s_addr = UA_htonl(INADDR_ANY);
//multihomed hosts can join several groups on different IF, INADDR_ANY -> kernel decides
if(UA_setsockopt(channel->sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &groupV4, sizeof(groupV4)) != 0) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection not on multicast");
}
#if UA_IPV6
} else if (connectionConfig->ai_family == PF_INET6) {//IPv6 handling
//TODO implement regist for IPv6
#endif
} else {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
return UA_STATUSCODE_GOOD;
}
/**
* Remove current subscription.
*
* @return UA_STATUSCODE_GOOD on success
*/
static UA_StatusCode
UA_PubSubChannelUDPMC_unregist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
if(!(channel->state == UA_PUBSUB_CHANNEL_PUB_SUB || channel->state == UA_PUBSUB_CHANNEL_SUB)){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_PubSubChannelDataUDPMC * connectionConfig = (UA_PubSubChannelDataUDPMC *) channel->handle;
if(connectionConfig->ai_family == PF_INET){//IPv4 handling
struct ip_mreq groupV4;
memcpy(&groupV4.imr_multiaddr, &((const struct sockaddr_in *)connectionConfig->ai_addr)->sin_addr, sizeof(struct ip_mreq));
groupV4.imr_interface.s_addr = UA_htonl(INADDR_ANY);
if(UA_setsockopt(channel->sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char *) &groupV4, sizeof(groupV4)) != 0){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
#if UA_IPV6
} else if (connectionConfig->ai_family == PF_INET6) {//IPv6 handling
//TODO implement unregist for IPv6
#endif
} else {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
return UA_STATUSCODE_GOOD;
}
/**
* Send messages to the connection defined address
*
* @return UA_STATUSCODE_GOOD if success
*/
static UA_StatusCode
UA_PubSubChannelUDPMC_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettigns, const UA_ByteString *buf) {
UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)){
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed. Invalid state.");
return UA_STATUSCODE_BADINTERNALERROR;
}
//TODO evalute: chunk messages or check against MTU?
long nWritten = 0;
while (nWritten < (long)buf->length) {
long n = (long)UA_sendto(channel->sockfd, buf->data, buf->length, 0,
(struct sockaddr *) channelConfigUDPMC->ai_addr, sizeof(struct sockaddr_storage));
if(n == -1L) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
nWritten += n;
}
return UA_STATUSCODE_GOOD;
}
/**
* Receive messages. The regist function should be called before.
*
* @param timeout in usec | on windows platforms are only multiples of 1000usec possible
* @return
*/
static UA_StatusCode
UA_PubSubChannelUDPMC_receive(UA_PubSubChannel *channel, UA_ByteString *message, UA_ExtensionObject *transportSettigns, UA_UInt32 timeout){
if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection receive failed. Invalid state.");
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
if(timeout > 0) {
fd_set fdset;
FD_ZERO(&fdset);
UA_fd_set(channel->sockfd, &fdset);
struct timeval tmptv = {(long int)(timeout / 1000000),
(long int)(timeout % 1000000)};
int resultsize = UA_select(channel->sockfd+1, &fdset, NULL,
NULL, &tmptv);
if(resultsize == 0) {
message->length = 0;
return UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
}
if (resultsize == -1) {
message->length = 0;
return UA_STATUSCODE_BADINTERNALERROR;
}
}
if(channelConfigUDPMC->ai_family == PF_INET){
ssize_t messageLength;
messageLength = UA_recvfrom(channel->sockfd, message->data, message->length, 0, NULL, NULL);
if(messageLength > 0){
message->length = (size_t) messageLength;
} else {
message->length = 0;
}
#if UA_IPV6
} else {
//TODO implement recieve for IPv6
#endif
}
return UA_STATUSCODE_GOOD;
}
/**
* Close channel and free the channel data.
*
* @return UA_STATUSCODE_GOOD if success
*/
static UA_StatusCode
UA_PubSubChannelUDPMC_close(UA_PubSubChannel *channel) {
if(UA_close(channel->sockfd) != 0){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection delete failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_deinitialize_architecture_network();
//cleanup the internal NetworkLayer data
UA_PubSubChannelDataUDPMC *networkLayerData = (UA_PubSubChannelDataUDPMC *) channel->handle;
UA_free(networkLayerData->ai_addr);
UA_free(networkLayerData);
UA_free(channel);
return UA_STATUSCODE_GOOD;
}
/**
* Generate a new channel. based on the given configuration.
*
* @param connectionConfig connection configuration
* @return ref to created channel, NULL on error
*/
static UA_PubSubChannel *
TransportLayerUDPMC_addChannel(UA_PubSubConnectionConfig *connectionConfig) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub channel requested");
UA_PubSubChannel * pubSubChannel = UA_PubSubChannelUDPMC_open(connectionConfig);
if(pubSubChannel){
pubSubChannel->regist = UA_PubSubChannelUDPMC_regist;
pubSubChannel->unregist = UA_PubSubChannelUDPMC_unregist;
pubSubChannel->send = UA_PubSubChannelUDPMC_send;
pubSubChannel->receive = UA_PubSubChannelUDPMC_receive;
pubSubChannel->close = UA_PubSubChannelUDPMC_close;
pubSubChannel->connectionConfig = connectionConfig;
}
return pubSubChannel;
}
//UDPMC channel factory
UA_PubSubTransportLayer
UA_PubSubTransportLayerUDPMP() {
UA_PubSubTransportLayer pubSubTransportLayer;
pubSubTransportLayer.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
pubSubTransportLayer.createPubSubChannel = &TransportLayerUDPMC_addChannel;
return pubSubTransportLayer;
}