1 module trpc.server; 2 3 import trpc.ifinfo; 4 import asdf; 5 import std.json; 6 7 import std.experimental.logger : log, LogLevel; 8 9 private string _mixinDispatchCases(MethodInfo[] ifInfos) { 10 string s = ""; 11 foreach (m; ifInfos) { 12 s ~= ` 13 case "` ~ m.name ~ `":` ~ ` 14 ret = _disp_` ~ m.name ~ `(params, resp); 15 break;`; 16 } 17 18 return s ~ ` 19 default: ret = -1;`; 20 } 21 22 private struct ParamS { 23 string type; 24 string name; 25 } 26 27 private string _mixinDispatchMethod(MethodInfo info, string srvclass_name) { 28 import std.array : join; 29 ParamS[] _inParams, _outParams; 30 string r; 31 r ~= `//// RPC-Srv-Method [` ~ info.name ~ `] 32 ` ~ "int _disp_" ~ info.name ~ ` (JSONValue params, out string resp) { 33 bool _parse_ok = false; 34 `; 35 36 foreach (i, p; info.params) { 37 if (p.storetype == "out") { 38 _outParams ~= ParamS(p.type, p.name); 39 } 40 else { 41 _inParams ~= ParamS(p.type, p.name); 42 } 43 } 44 // in-param 45 r ~= `struct _InParams { `; 46 foreach (p; _inParams) { 47 r ~= p.type ~ " " ~ p.name ~ "; "; 48 } 49 r ~= `}`; 50 // out-param 51 r ~= ` struct _OutParams {`; 52 foreach (p; _outParams) { 53 r ~= p.type ~ " " ~ p.name ~ "; "; 54 } 55 r ~= info.rettype ~ ` ret; } _OutParams _outp;`; 56 57 // deserial-in-param 58 r ~= ` _InParams _inp; ` ~ info.rettype ~ ` ret; 59 try { 60 _inp = deserialize!_InParams(params.toString);`; 61 62 r ~= ` 63 ret = ` ~ srvclass_name ~ `.` ~ info.name ~ `(`; 64 string [] _pnames; 65 foreach (ap; info.params) { 66 if ((ap.storetype == "none") || (ap.storetype == "ref")) 67 _pnames ~= `_inp.` ~ ap.name; 68 if (ap.storetype == "out") 69 _pnames ~= `_outp.` ~ ap.name; 70 } 71 r ~= _pnames.join(", ") ~ `);`; 72 73 // out param assignment. 74 r ~= ` 75 _outp.ret = ret;`; 76 77 r ~= ` 78 _parse_ok = true; 79 } catch (Exception e) { 80 _parse_ok = false; 81 log("Error, deserialize In-Param of method error!"); 82 } 83 `; 84 85 // 86 r ~= ` 87 if (_parse_ok) { 88 try { 89 resp = serializeToJson(_outp); 90 version (LogRPCInfo) { 91 log("Responce json is : ", resp); 92 } 93 } 94 catch (Exception e) { 95 _parse_ok = false; 96 log(e.msg); 97 } 98 } 99 100 if (_parse_ok) { 101 return 0; 102 } 103 104 return -1; 105 }`; 106 107 return r; 108 } 109 110 /// 111 class ImplRpcSrvOfIf(C) { 112 113 import std.traits : moduleName; 114 import std.array : join; 115 116 /// TODO: "working" workaround for importing identifieres related to I. 117 /// need to find a more-elegant-way 118 enum _ifModuleName = moduleName!C; 119 120 /// interface meta info 121 enum _minfos = IfInfo!C().getInfos(); 122 123 //pragma(msg, "_ifinfos of Class: ", _minfos); 124 125 private C _srvclass; 126 127 /// 128 enum _srvclass_name = __traits(identifier, _srvclass); 129 130 /// 131 this(ref C srvclass) { 132 _srvclass = srvclass; 133 } 134 135 /// mixin dispatch method. 136 static foreach (m; _minfos) { 137 //pragma(msg, _mixinDispatchMethod(m, _srvclass_name)); 138 static if (m._customtypes.length > 0) { 139 mixin(`import ` ~ _ifModuleName ~ `: ` ~ m._customtypes.join(", ") ~ `; 140 ` ~ _mixinDispatchMethod(m, _srvclass_name)); 141 } 142 else { 143 mixin(_mixinDispatchMethod(m, _srvclass_name)); 144 } 145 } 146 147 private int _dispatch(string method, JSONValue params, out string resp) { 148 mixin(`int ret = 0;`); 149 immutable string _case = _mixinDispatchCases(_minfos); 150 //pragma(msg, _case); 151 switch (method) { 152 mixin(_case); 153 } 154 return ret; 155 } 156 157 /// execute rpc request 158 int executeMethod(string req, out string resp) { 159 version (LogRPCInfo) { 160 log("req is ", req); 161 } 162 long id; string method; JSONValue params; 163 JSONValue reqjson; string respjs; 164 int ret = 0; 165 try { 166 reqjson = parseJSON(req); 167 id = reqjson["id"].integer; 168 method = reqjson["method"].str; 169 params = reqjson["params"].object; 170 ret = this._dispatch(method, params, respjs); 171 } 172 catch (Exception e) { 173 log(e.msg); 174 } 175 176 JSONValue respjobj; 177 try { 178 respjobj["method"] = JSONValue(method); 179 respjobj["id"] = JSONValue(id); 180 respjobj["status"] = JSONValue(ret); 181 } 182 catch (Exception e) { 183 log(e.msg); 184 } 185 186 respjobj["result"] = parseJSON(respjs); 187 if (ret == 0) { 188 respjobj["errinfo"] = ""; 189 } 190 else { 191 respjobj["errinfo"] = "Json rpc request failed!"; 192 } 193 194 resp = respjobj.toString; 195 version (LogRPCInfo) { 196 log("Method retured Json-Str: ", resp); 197 } 198 return ret; 199 } 200 } 201 202 import std.concurrency : spawn, Tid, receiveOnly, send, ownerTid; 203 204 /// Thread-RPC method excute and responce deals 205 void thrdRPCSrvProcess (C) (ImplRpcSrvOfIf!C srvcls) { 206 import std.concurrency : Tid, receiveOnly, send, ownerTid; 207 auto recv = receiveOnly!string(); 208 string resp; 209 srvcls.executeMethod(recv, resp); 210 ownerTid.send(resp); 211 } 212 213 /// Thread-RPC method excute and responce deals - None-block 214 void thrdRPCSrvProcessNoneBlock (C) (ImplRpcSrvOfIf!C srvcls, int timeout_ms) { 215 import std.concurrency : Tid, receiveTimeout, send, ownerTid; 216 import core.thread : msecs; 217 string recv, resp; 218 const bool _received = receiveTimeout(msecs(timeout_ms), (string req) { 219 recv = req; 220 }); 221 if (_received) { 222 srvcls.executeMethod(recv, resp); 223 ownerTid.send(resp); 224 } 225 } 226 227 /// simple standard Thread RPC server 228 mixin template thrdRPCSrvThreadTemplate(alias C, alias CtorCall) { 229 void thrdRPCSrvThread() { 230 import core.thread : msecs, Thread; 231 auto _srvimpl = CtorCall; 232 auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl); 233 234 while (true) { 235 thrdRPCSrvProcess!C(srvcls); 236 Thread.sleep(msecs(100)); 237 } 238 } 239 } 240 241 import std.socket : TcpSocket, Socket, SocketSet, SocketException; 242 243 /// TCP handler for RPC server 244 struct TcpHdl { 245 TcpSocket tcpsock; /// TCP socket 246 SocketSet sockset; /// socket set 247 int _maxconn; /// max of TCP connections 248 int _maxrecv; /// max receive size 249 bool close_after_resp; /// only alow one connection 250 } 251 252 /// simple standard Thread RPC server 253 mixin template thrdRPCTcpSrvTemplate(alias C, alias CtorCall) { 254 import std.socket : Socket, SocketSet, SocketException; 255 import std.experimental.logger : log, LogLevel; 256 void thrdRPCTCPSrv(TcpHdl * tcphdl) { 257 import core.thread : msecs, Thread; 258 import std.conv : to; 259 import std.algorithm : remove; 260 auto _srvimpl = CtorCall; 261 auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl); 262 263 Socket[] reads; 264 char[] buf = new char[tcphdl._maxrecv]; 265 string _req, _resp; 266 267 void closeClientSocketConn(size_t i, ref Socket sock) { 268 try { 269 version (LogRPCInfo) { 270 log("Client connection from port ", sock.remoteAddress().toString(), 271 " was closed or in Exception, remove it!"); 272 } 273 } 274 catch (Exception e) { 275 276 } 277 278 sock.close(); 279 reads = reads.remove(i); 280 } 281 282 void readAndProcess() { 283 foreach (i, r; reads) { 284 if (tcphdl.sockset.isSet(r)) { 285 auto _len = r.receive(buf); 286 if (_len == Socket.ERROR) { 287 closeClientSocketConn(i, r); 288 continue; 289 } 290 else if (_len > 0) { 291 version (LogRPCInfo) { 292 log("TCP-RPC server got cmd"); 293 } 294 try { 295 _req = to!string(buf[0 .. _len]); 296 srvcls.executeMethod(_req, _resp); 297 r.send(_resp); 298 } 299 catch (Exception e) { 300 log(e.msg); 301 continue; 302 } 303 } 304 else { // receive returned 0 (EOF) means client just closed. 305 closeClientSocketConn(i, r); 306 continue; 307 } 308 309 if (tcphdl.close_after_resp) { 310 r.close(); 311 reads = reads.remove(i); 312 version (LogRPCInfo) { 313 log("TCP total connection : ", reads.length); 314 } 315 } 316 } 317 318 if (!r.isAlive) { 319 closeClientSocketConn(i, r); 320 continue; 321 } 322 } 323 } 324 325 void connCheck() { 326 if (reads.length < tcphdl._maxconn) { 327 if (tcphdl.sockset.isSet(tcphdl.tcpsock)) { 328 Socket sc = null; 329 try { 330 sc = tcphdl.tcpsock.accept(); 331 } 332 catch (Exception e) { 333 log(LogLevel.critical, e.msg); 334 return; 335 } 336 337 reads ~= sc; 338 339 scope (failure) { 340 log(LogLevel.critical, "Error accepting"); 341 if (sc) 342 sc.close(); 343 } 344 } 345 } 346 else { 347 log(LogLevel.critical, "ERRRRR! can not open more tcp connections!"); 348 } 349 } 350 351 while (true) { 352 tcphdl.sockset.add(tcphdl.tcpsock); 353 foreach (sock; reads) { 354 tcphdl.sockset.add(sock); 355 } 356 Socket.select(tcphdl.sockset, null, null); 357 readAndProcess(); 358 connCheck(); 359 // must reset sockset, otherwize, we can't accept on new conn. 360 tcphdl.sockset.reset(); 361 } 362 } 363 }