1 module rpcserver; 2 3 import ifinfo; 4 import asdf; 5 import std.json; 6 7 import std.experimental.logger : log, LogLevel; 8 9 private string _mixinDispatchCases(MethodInfo[] ifInfos) { 10 string s = ""; 11 foreach (m; ifInfos) { 12 s ~= ` 13 case "` ~ m.name ~ `":` ~ ` 14 ret = _disp_` ~ m.name ~ `(params, resp); 15 break;`; 16 } 17 18 return s ~ ` 19 default: ret = -1;`; 20 } 21 22 private struct ParamS { 23 string type; 24 string name; 25 } 26 27 private string _mixinDispatchMethod(MethodInfo info, string srvclass_name) { 28 import std.array : join; 29 ParamS[] _inParams, _outParams; 30 string r; 31 r ~= `//// RPC-Srv-Method [` ~ info.name ~ `] 32 ` ~ "int _disp_" ~ info.name ~ ` (JSONValue params, out string resp) { 33 bool _parse_ok = false; 34 `; 35 36 foreach (i, p; info.params) { 37 if (p.storetype == "out") { 38 _outParams ~= ParamS(p.type, p.name); 39 } 40 else { 41 _inParams ~= ParamS(p.type, p.name); 42 } 43 } 44 // in-param 45 r ~= `struct _InParams { `; 46 foreach (p; _inParams) { 47 r ~= p.type ~ " " ~ p.name ~ "; "; 48 } 49 r ~= `}`; 50 // out-param 51 r ~= ` struct _OutParams {`; 52 foreach (p; _outParams) { 53 r ~= p.type ~ " " ~ p.name ~ "; "; 54 } 55 r ~= info.rettype ~ ` ret; } _OutParams _outp;`; 56 57 // deserial-in-param 58 r ~= ` _InParams _inp; ` ~ info.rettype ~ ` ret; 59 try { 60 _inp = deserialize!_InParams(params.toString);`; 61 62 r ~= ` 63 ret = ` ~ srvclass_name ~ `.` ~ info.name ~ `(`; 64 string [] _pnames; 65 foreach (ap; info.params) { 66 if ((ap.storetype == "none") || (ap.storetype == "ref")) 67 _pnames ~= `_inp.` ~ ap.name; 68 if (ap.storetype == "out") 69 _pnames ~= `_outp.` ~ ap.name; 70 } 71 r ~= _pnames.join(", ") ~ `);`; 72 73 // out param assignment. 74 r ~= ` 75 _outp.ret = ret;`; 76 77 r ~= ` 78 _parse_ok = true; 79 } catch (Exception e) { 80 _parse_ok = false; 81 log("Error, deserialize In-Param of method error!"); 82 } 83 `; 84 85 // 86 r ~= ` 87 if (_parse_ok) { 88 try { 89 resp = serializeToJson(_outp); 90 log("Responce json is : ", resp); 91 } 92 catch (Exception e) { 93 _parse_ok = false; 94 log(e.msg); 95 } 96 } 97 98 if (_parse_ok) { 99 return 0; 100 } 101 102 return -1; 103 }`; 104 105 return r; 106 } 107 108 /// 109 class ImplRpcSrvOfIf(C) { 110 111 import std.traits : moduleName; 112 import std.array : join; 113 114 /// TODO: "working" workaround for importing identifieres related to I. 115 /// need to find a more-elegant-way 116 enum _ifModuleName = moduleName!C; 117 118 /// interface meta info 119 enum _minfos = IfInfo!C().getInfos(); 120 121 //pragma(msg, "_ifinfos of Class: ", _minfos); 122 123 private C _srvclass; 124 125 /// 126 enum _srvclass_name = __traits(identifier, _srvclass); 127 128 /// 129 this(ref C srvclass) { 130 _srvclass = srvclass; 131 } 132 133 /// mixin dispatch method. 134 static foreach (m; _minfos) { 135 //pragma(msg, _mixinDispatchMethod(m, _srvclass_name)); 136 static if (m._customtypes.length > 0) { 137 mixin(`import ` ~ _ifModuleName ~ `: ` ~ m._customtypes.join(", ") ~ `; 138 ` ~ _mixinDispatchMethod(m, _srvclass_name)); 139 } 140 else { 141 mixin(_mixinDispatchMethod(m, _srvclass_name)); 142 } 143 } 144 145 private int _dispatch(string method, JSONValue params, out string resp) { 146 mixin(`int ret = 0;`); 147 immutable string _case = _mixinDispatchCases(_minfos); 148 //pragma(msg, _case); 149 switch (method) { 150 mixin(_case); 151 } 152 return ret; 153 } 154 155 /// execute rpc request 156 int executeMethod(string req, out string resp) { 157 log("req is ", req); 158 long id; string method; JSONValue params; 159 JSONValue reqjson; string respjs; 160 int ret = 0; 161 try { 162 reqjson = parseJSON(req); 163 id = reqjson["id"].integer; 164 method = reqjson["method"].str; 165 params = reqjson["params"].object; 166 ret = this._dispatch(method, params, respjs); 167 } 168 catch (Exception e) { 169 log(e.msg); 170 } 171 172 JSONValue respjobj; 173 try { 174 respjobj["method"] = JSONValue(method); 175 respjobj["id"] = JSONValue(id); 176 respjobj["status"] = JSONValue(ret); 177 } 178 catch (Exception e) { 179 log(e.msg); 180 } 181 182 respjobj["result"] = parseJSON(respjs); 183 if (ret == 0) { 184 respjobj["errinfo"] = ""; 185 } 186 else { 187 respjobj["errinfo"] = "Json rpc request failed!"; 188 } 189 190 resp = respjobj.toString; 191 log("Method retured Json-Str: ", resp); 192 return ret; 193 } 194 } 195 196 import std.concurrency : spawn, Tid, receiveOnly, send, ownerTid; 197 198 /// Thread-RPC method excute and responce deals 199 void thrdRPCSrvProcess (C) (ImplRpcSrvOfIf!C srvcls) { 200 import std.concurrency : Tid, receiveOnly, send, ownerTid; 201 auto recv = receiveOnly!string(); 202 string resp; 203 srvcls.executeMethod(recv, resp); 204 ownerTid.send(resp); 205 } 206 207 /// Thread-RPC method excute and responce deals - None-block 208 void thrdRPCSrvProcessNoneBlock (C) (ImplRpcSrvOfIf!C srvcls, int timeout_ms) { 209 import std.concurrency : Tid, receiveTimeout, send, ownerTid; 210 import core.thread : msecs; 211 string recv, resp; 212 const bool _received = receiveTimeout(msecs(timeout_ms), (string req) { 213 recv = req; 214 }); 215 if (_received) { 216 srvcls.executeMethod(recv, resp); 217 ownerTid.send(resp); 218 } 219 } 220 221 /// simple standard Thread RPC server 222 mixin template thrdRPCSrvThreadTemplate(alias C, alias CtorCall) { 223 void thrdRPCSrvThread() { 224 import core.thread : msecs, Thread; 225 auto _srvimpl = CtorCall; 226 auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl); 227 228 while (true) { 229 thrdRPCSrvProcess!C(srvcls); 230 Thread.sleep(msecs(100)); 231 } 232 } 233 } 234 235 import std.socket : TcpSocket, Socket, SocketSet, SocketException; 236 237 /// TCP handler for RPC server 238 struct TcpHdl { 239 TcpSocket tcpsock; /// TCP socket 240 SocketSet sockset; /// socket set 241 int _maxconn; /// max of TCP connections 242 int _maxrecv; /// max receive size 243 bool close_after_resp; /// only alow one connection 244 } 245 246 /// simple standard Thread RPC server 247 mixin template thrdRPCTcpSrvTemplate(alias C, alias CtorCall) { 248 import std.socket : Socket, SocketSet, SocketException; 249 void thrdRPCTCPSrv(TcpHdl * tcphdl) { 250 import core.thread : msecs, Thread; 251 import std.conv : to; 252 auto _srvimpl = CtorCall; 253 auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl); 254 255 Socket[] reads; 256 char[] buf = new char[tcphdl._maxrecv]; 257 string _req, _resp; 258 259 void readAndProcess() { 260 foreach (i, r; reads) { 261 if (tcphdl.sockset.isSet(r)) { 262 auto _len = r.receive(buf); 263 if (_len == Socket.ERROR) { 264 265 } 266 else if (_len > 0) { 267 log("TCP-RPC server got cmd"); 268 try { 269 _req = to!string(buf[0 .. _len]); 270 srvcls.executeMethod(_req, _resp); 271 r.send(_resp); 272 } 273 catch (Exception e) { 274 log(e.msg); 275 return; 276 } 277 } 278 else { 279 try { 280 281 } 282 catch (SocketException) { 283 log("Socket connection closed."); 284 } 285 } 286 if (tcphdl.close_after_resp) { 287 r.close(); 288 import std.algorithm : remove; 289 reads = reads.remove(i); 290 log("TCP total connection : ", reads.length); 291 } 292 } 293 } 294 } 295 296 void connCheck() { 297 if (tcphdl.sockset.isSet(tcphdl.tcpsock)) { 298 Socket sc = null; 299 sc = tcphdl.tcpsock.accept(); 300 if (reads.length < tcphdl._maxconn) { 301 reads ~= sc; 302 } 303 else { 304 sc.close(); 305 } 306 scope (failure) { 307 log(LogLevel.critical, "Error accepting"); 308 if (sc) 309 sc.close(); 310 } 311 } 312 } 313 314 while (true) { 315 tcphdl.sockset.add(tcphdl.tcpsock); 316 foreach (sock; reads) { 317 tcphdl.sockset.add(sock); 318 } 319 Socket.select(tcphdl.sockset, null, null); 320 readAndProcess(); 321 connCheck(); 322 if (tcphdl.close_after_resp) { 323 tcphdl.sockset.reset(); 324 } 325 } 326 } 327 }