1 module kaleidic.nanomsg.wrap; 2 import kaleidic.nanomsg.bindings; 3 4 import std.stdio; 5 import std.conv; 6 import std.string; 7 8 enum NanoSocketOptions 9 { 10 linger = NN.LINGER, 11 sendBuffer = NN.SNDBUF, 12 receiveBuffer = NN.RCVBUF, 13 sendTimeOut = NN.SNDTIMEO, 14 receiveTimeOut = NN.RCVTIMEO, 15 reconnectInterval = NN.RECONNECT_IVL, 16 reconnectIntervalMax = NN.RECONNECT_IVL_MAX, 17 sendPriority = NN.SNDPRIO, 18 receivePriority = NN.RCVPRIO, 19 receiveFD = NN.RCVFD, 20 domain = NN.DOMAIN, 21 protocol = NN.PROTOCOL, 22 ip4Only = NN.IPV4ONLY, 23 socketName = NN.SOCKET_NAME, 24 receiveMaxSize = NN.RCVMAXSIZE, 25 } 26 27 struct NanoMessage 28 { 29 // default constructor no longer allowed 30 // so this() must always be called with specific 31 // arguments. defaults used to be: 32 // param1=AF_SP, param2=NN_REP 33 // probably better this way anyway 34 35 char *url; 36 int sock=-1; 37 char* buf = null; 38 bool isShutDown=true; 39 int eid=-1; 40 41 @disable this(this); 42 43 void createSocket(int param1,int param2) 44 { 45 import std.exception:enforce; 46 this.sock = nn_socket(param1,param2); 47 enforce(sock >= 0,"cannot create nanomsg socket for modes "~ to!string(param1) ~ " "~ to!string(param2)~"\n"~errorMessage()~"\n"); 48 isShutDown = false; 49 } 50 51 ~this() 52 { 53 import std.stdio; 54 55 writefln("nanomsg destructor running!!!"); 56 /* 57 if (!isShutDown) 58 this.shutdown(); */ 59 if(sock>-1) 60 nn_close(sock); 61 sock=-1; 62 eid=-1; 63 } 64 } 65 66 string errorMessage() 67 { 68 return nn_strerror(nn_errno()).fromStringz.idup; 69 } 70 71 auto surl(ref NanoMessage nano) 72 { 73 return nano.url.fromStringz; 74 } 75 76 auto ref bind(ref NanoMessage nano, string surl) 77 { 78 writefln("* binding to: %s",surl); 79 nano.open(surl,true); 80 return nano; 81 } 82 auto ref connect(ref NanoMessage nano, string surl) 83 { 84 nano.open(surl,false); 85 return nano; 86 } 87 88 auto ref open(ref NanoMessage nano,string surl, bool bind=true) 89 { 90 import std.exception:enforce; 91 enforce(nano.sock>=0,"cannot create nanomsg socket for "~surl~": "~errorMessage()); 92 if (bind) 93 nano.eid = nn_bind(nano.sock,toStringz(surl)); 94 else 95 nano.eid = nn_connect(nano.sock,toStringz(surl)); 96 writefln("* eid = %s for %s to: %s",nano.eid,bind?"bind":"connect",surl); 97 enforce(nano.eid >= 0,"nanomsg did not " ~ (bind?"bind":"connect")~" to new socket for "~surl~": "~errorMessage()); 98 return nano; 99 } 100 ubyte[] receive(ref NanoMessage nano, int flags, bool pubsub=false) 101 { 102 ubyte[] recvbytes; 103 nano.buf=null; 104 //consider returning as sized array without copy 105 auto numbytes = nn_recv(nano.sock,&nano.buf,NN_MSG,flags); 106 scope(exit) 107 { 108 if(nano.buf !is null) 109 nn_freemsg(nano.buf); 110 nano.buf=null; 111 } 112 if (numbytes >= 0) 113 { 114 recvbytes.length=numbytes+1; 115 foreach(i;0..numbytes) 116 { 117 recvbytes[i]=nano.buf[i]; 118 } 119 return recvbytes; 120 } 121 else 122 { 123 if(pubsub) 124 return []; // TO DO FIX ME - distinguish between error and no message for me 125 else 126 throw new Exception("nanomsg encountered an error whilst trying to receive a message for "~to!string(nano.url) ~ " error:"~ errorMessage()); 127 } 128 assert(0); 129 } 130 131 string receiveAsString(ref NanoMessage nano, int flags=0, bool pubSub=false) 132 { 133 return to!string(cast(char[])nano.receive(flags,pubSub)); 134 } 135 136 int send(ref NanoMessage nano, char* mybuf, size_t numbytes, bool nonBlocking=false) 137 { 138 return nn_send(nano.sock,mybuf,numbytes,nonBlocking?NN_DONTWAIT:0).errcheck("send(char*)"); 139 } 140 141 int send(ref NanoMessage nano, ubyte[] mybuf, bool nonBlocking=false) 142 { 143 return nn_send(nano.sock,cast(char*)mybuf.ptr,cast(int)(mybuf.length),nonBlocking?NN_DONTWAIT:0).errcheck("send(ubyte[])"); 144 } 145 146 int send(ref NanoMessage nano, string mybuf, bool nonBlocking=false) 147 { 148 return nn_send(nano.sock,mybuf.ptr,mybuf.length+1,nonBlocking?NN_DONTWAIT:0).errcheck("send(string mybuf)"); 149 } 150 151 auto ref setOpt(ref NanoMessage nano,int level, int option, string stringVal) 152 { 153 nn_setsockopt(nano.sock,level,option,stringVal.toStringz,stringVal.length); 154 return nano; 155 } 156 157 auto ref setOpt(T)(ref NanoMessage nano, int level, int option, T* optval) 158 { 159 nn_setsockopt(nano.sock,level,option,optval,(optval).size); 160 return nano; 161 } 162 163 auto ref getOpt(ref NanoMessage nano, int level, int option, void* optval, size_t *optvallen) 164 { 165 nn_getsockopt(nano.sock,level,option,optval,optvallen); 166 return nano; 167 } 168 auto ref close(ref NanoMessage nano) 169 { 170 if(nano.sock!=-1) 171 errcheck(nn_close(nano.sock),"close()"); 172 nano.sock=-1; 173 nano.eid=-1; 174 return nano; 175 } 176 177 int sendMessage(ref NanoMessage nano,const nn_msghdr* msghdr, int flags) 178 { 179 return nn_sendmsg(nano.sock,msghdr,flags); 180 } 181 182 int receiveMessage(ref NanoMessage nano, nn_msghdr* msghdr, int flags) 183 { 184 return nn_recvmsg(nano.sock,msghdr,flags).errcheck("receiveMessage"); 185 } 186 187 bool canReceive(ref NanoMessage nano) 188 { 189 nn_pollfd pfd; 190 pfd.fd = nano.sock; 191 pfd.events = NN_POLLIN | NN_POLLOUT; 192 auto rc = nn_poll(&pfd, 1, 100); 193 return false; 194 // if (res==-1) 195 // throw new Exception("unable to check socket readiness status - res=" ~ to!string(res) ~ "errno=" ~ to!string(nn_errno())); 196 // writefln("* returning can rx: %s", pfd.revents); 197 // return (pfd.revents & NN_POLLIN)!=0; 198 } 199 200 bool canSend(ref NanoMessage nano) 201 { 202 import std.exception:enforce; 203 nn_pollfd pfd; 204 pfd.fd=nano.sock; 205 pfd.events=NN_POLLOUT; 206 auto res=nn_poll(&pfd,1,2000); 207 enforce(res!=-1,"nanomsg: unable to check socket readiness status- "~errorMessage()); 208 return(pfd.revents&& NN_POLLOUT)!=0; 209 } 210 211 void freeMessage(ref NanoMessage nano) 212 { 213 if (nano.buf) 214 nn_freemsg(nano.buf); 215 nano.buf=null; 216 } 217 218 auto ref shutdown(ref NanoMessage nano) 219 { 220 if (nano.buf) 221 nn_freemsg(nano.buf); 222 223 if(nano.eid>-1) 224 { 225 auto ret=nn_shutdown(nano.sock,nano.eid); // we should check this value and throw exception if need be 226 } 227 nano.eid=-1; 228 nano.sock=-1; 229 nano.isShutDown=true; 230 return nano; 231 } 232 233 private int errcheck(int retval, string caller="") 234 { 235 import std.exception:enforce; 236 enforce(retval!=-1,"nanomsg error"~caller~": " ~ errorMessage()); 237 return retval; 238 }