1 module kaleidic.nanomsg.wrap;
2 import kaleidic.nanomsg.bindings;
3 
4 import std.stdio;
5 import std.conv;
6 import std.string;
7 
8 enum NanoSocketOptions
9 {
10     linger = NN.LINGER,
11     sendBuffer = NN.SNDBUF,
12     receiveBuffer = NN.RCVBUF,
13     sendTimeOut = NN.SNDTIMEO,
14     receiveTimeOut = NN.RCVTIMEO,
15     reconnectInterval = NN.RECONNECT_IVL,
16     reconnectIntervalMax = NN.RECONNECT_IVL_MAX,
17     sendPriority = NN.SNDPRIO,
18     receivePriority = NN.RCVPRIO,
19     receiveFD = NN.RCVFD,
20     domain = NN.DOMAIN,
21     protocol = NN.PROTOCOL,
22     ip4Only = NN.IPV4ONLY,
23     socketName = NN.SOCKET_NAME,
24     receiveMaxSize = NN.RCVMAXSIZE,
25 }
26 
27 struct NanoMessage
28 {
29     // default constructor no longer allowed
30     // so this() must always be called with specific
31     // arguments.  defaults used to be:
32     // param1=AF_SP, param2=NN_REP
33     // probably better this way anyway
34 
35     char *url;
36     int sock=-1;
37     char* buf = null;
38     bool isShutDown=true;
39     int eid=-1;
40 
41     @disable this(this);
42 
43     void createSocket(int param1,int param2)
44     {
45         import std.exception:enforce;
46         this.sock = nn_socket(param1,param2);
47         enforce(sock >= 0,"cannot create nanomsg socket for modes "~ to!string(param1) ~ " "~ to!string(param2)~"\n"~errorMessage()~"\n");
48         isShutDown = false;
49     }
50 
51     ~this()
52     {
53         import std.stdio;
54 
55         writefln("nanomsg destructor running!!!");
56         /*
57         if (!isShutDown)
58             this.shutdown(); */
59         if(sock>-1)
60             nn_close(sock);
61         sock=-1;
62         eid=-1;
63     }
64 }
65 
66 string errorMessage()
67 {
68     return nn_strerror(nn_errno()).fromStringz.idup;
69 }
70 
71 auto surl(ref NanoMessage nano)
72 {
73     return nano.url.fromStringz;
74 }
75 
76 auto ref bind(ref NanoMessage nano, string surl)
77 {
78     writefln("* binding to: %s",surl);
79     nano.open(surl,true);
80     return nano;
81 }
82 auto ref connect(ref NanoMessage nano, string surl)
83 {
84     nano.open(surl,false);
85     return nano;
86 }
87 
88 auto ref open(ref NanoMessage nano,string surl, bool bind=true)
89 {
90     import std.exception:enforce;
91     enforce(nano.sock>=0,"cannot create nanomsg socket for "~surl~": "~errorMessage());
92     if (bind)
93         nano.eid = nn_bind(nano.sock,toStringz(surl));
94     else
95         nano.eid = nn_connect(nano.sock,toStringz(surl));
96     writefln("* eid = %s for %s to: %s",nano.eid,bind?"bind":"connect",surl);
97     enforce(nano.eid >= 0,"nanomsg did not " ~ (bind?"bind":"connect")~" to new socket for "~surl~": "~errorMessage());
98     return nano;
99 }
100 ubyte[] receive(ref NanoMessage nano, int flags, bool pubsub=false)
101 {
102     ubyte[] recvbytes;
103     nano.buf=null;
104     //consider returning as sized array without copy
105     auto numbytes = nn_recv(nano.sock,&nano.buf,NN_MSG,flags);
106     scope(exit)
107     {
108         if(nano.buf !is null)
109             nn_freemsg(nano.buf);
110         nano.buf=null;
111     }
112     if (numbytes >= 0)
113     {
114         recvbytes.length=numbytes+1;
115         foreach(i;0..numbytes)
116         {
117             recvbytes[i]=nano.buf[i];
118         }
119         return recvbytes;
120     }
121     else
122     {
123         if(pubsub)
124             return [];  // TO DO FIX ME - distinguish between error and no message for me
125         else
126             throw new Exception("nanomsg encountered an error whilst trying to receive a message for "~to!string(nano.url) ~ " error:"~ errorMessage());
127     }
128     assert(0);
129 }
130 
131 string receiveAsString(ref NanoMessage nano, int flags=0, bool pubSub=false)
132 {
133     return to!string(cast(char[])nano.receive(flags,pubSub));
134 }
135 
136 int send(ref NanoMessage nano, char* mybuf, size_t numbytes, bool nonBlocking=false)
137 {
138     return nn_send(nano.sock,mybuf,numbytes,nonBlocking?NN_DONTWAIT:0).errcheck("send(char*)");
139 }
140 
141 int send(ref NanoMessage nano, ubyte[] mybuf, bool nonBlocking=false)
142 {
143     return nn_send(nano.sock,cast(char*)mybuf.ptr,cast(int)(mybuf.length),nonBlocking?NN_DONTWAIT:0).errcheck("send(ubyte[])");
144 }
145 
146 int send(ref NanoMessage nano, string mybuf, bool nonBlocking=false)
147 {
148     return nn_send(nano.sock,mybuf.ptr,mybuf.length+1,nonBlocking?NN_DONTWAIT:0).errcheck("send(string mybuf)");
149 }
150 
151 auto ref setOpt(ref NanoMessage nano,int level, int option, string stringVal)
152 {
153     nn_setsockopt(nano.sock,level,option,stringVal.toStringz,stringVal.length);
154     return nano;
155 }
156 
157 auto ref setOpt(T)(ref NanoMessage nano, int level, int option, T* optval)
158 {
159     nn_setsockopt(nano.sock,level,option,optval,(optval).size);
160     return nano;
161 }
162 
163 auto ref getOpt(ref NanoMessage nano, int level, int option, void* optval, size_t *optvallen)
164 {
165     nn_getsockopt(nano.sock,level,option,optval,optvallen);
166     return nano;
167 }
168 auto ref close(ref NanoMessage nano)
169 {
170     if(nano.sock!=-1)
171         errcheck(nn_close(nano.sock),"close()");
172     nano.sock=-1;
173     nano.eid=-1;
174     return nano;
175 }
176 
177 int sendMessage(ref NanoMessage nano,const nn_msghdr* msghdr, int flags)
178 {
179     return nn_sendmsg(nano.sock,msghdr,flags);
180 }
181 
182 int receiveMessage(ref NanoMessage nano, nn_msghdr* msghdr, int flags)
183 {
184     return nn_recvmsg(nano.sock,msghdr,flags).errcheck("receiveMessage");
185 }
186 
187 bool canReceive(ref NanoMessage nano)
188 {
189     nn_pollfd  pfd;
190     pfd.fd = nano.sock;
191     pfd.events = NN_POLLIN | NN_POLLOUT;
192     auto rc = nn_poll(&pfd, 1, 100);
193     return false;
194 //      if (res==-1)
195    //     throw new Exception("unable to check socket readiness status - res=" ~ to!string(res) ~ "errno=" ~ to!string(nn_errno()));
196 //    writefln("* returning can rx: %s", pfd.revents);
197 //    return (pfd.revents & NN_POLLIN)!=0;
198 }
199 
200 bool canSend(ref NanoMessage nano)
201 {
202     import std.exception:enforce;
203     nn_pollfd pfd;
204     pfd.fd=nano.sock;
205     pfd.events=NN_POLLOUT;
206     auto res=nn_poll(&pfd,1,2000);
207     enforce(res!=-1,"nanomsg: unable to check socket readiness status- "~errorMessage());
208     return(pfd.revents&& NN_POLLOUT)!=0;
209 }
210 
211 void freeMessage(ref NanoMessage nano)
212 {
213     if (nano.buf)
214         nn_freemsg(nano.buf);
215     nano.buf=null;
216 }
217 
218 auto ref shutdown(ref NanoMessage nano)
219 {
220     if (nano.buf)
221         nn_freemsg(nano.buf);
222 
223     if(nano.eid>-1)
224     {
225         auto ret=nn_shutdown(nano.sock,nano.eid); // we should check this value and throw exception if need be
226     }
227     nano.eid=-1;
228     nano.sock=-1;
229     nano.isShutDown=true;
230     return nano;
231 }
232 
233 private int errcheck(int retval, string caller="")
234 {
235     import std.exception:enforce;
236     enforce(retval!=-1,"nanomsg error"~caller~": " ~ errorMessage());
237     return retval;
238 }