1 module trpc.client; 2 3 import trpc.ifinfo; 4 import asdf; 5 import std.experimental.logger : log; 6 7 /// TODO _rpc_id should be mutex protected. 8 __gshared long _rpc_id = 1; 9 long getJsonRPCID() { 10 return _rpc_id++; 11 } 12 13 /// mixin a single method, seprate this makes code a bit more clear 14 /// rpcSendReqFuncname is: string funcname (string req, int timeout); 15 string _mixinSingleFunc(MethodInfo info, string rpcSendReqFuncname, int timeout) { 16 import std.array : join; 17 import std.conv : to; 18 /// function signature 19 string r = ""; 20 r ~= `//// RPC-Method [` ~ info.name ~ `] 21 ` ~ info.rettype ~ " " ~ info.name ~ "("; 22 // params 23 string[] _params = new string[info.params.length]; 24 foreach (i, p; info.params) { 25 if ((p.storetype == "out") || (p.storetype == "ref")) { 26 _params[i] ~= p.storetype ~ " " ~ p.type ~ " " ~ p.name; 27 } 28 else { 29 _params[i] ~= p.type ~ " " ~ p.name; 30 } 31 } 32 r ~= _params.join(", "); 33 r ~= `) { 34 import std.experimental.logger : log, LogLevel; 35 immutable _rpcName = "` ~ info.name ~ `"; 36 ` ~ info.rettype ~ ` _ret;`; 37 // function scope erea. 38 /// 39 /// now serialize all params and function name, return to a rpc call string or blob. 40 /// 41 import std.algorithm : map; 42 import std.conv: to; 43 import std.array : array; 44 string[] _outparams; string[] _inparams; string[] _outparam_names; 45 foreach (i, p; info.params) { 46 if (p.storetype == "out") { 47 _outparams ~= p.type ~ " " ~ p.name; 48 _outparam_names ~= p.name; 49 } 50 else { 51 _inparams ~= p.type ~ " " ~ p.name; 52 } 53 } 54 // in params outline 55 r ~= ` struct _InParams {` ~ _inparams.map!(a => a ~ "; ").join ~ `}`; 56 r ~= ` 57 _InParams _inprm; 58 `; 59 foreach (i, p; info.params) { 60 if (p.storetype != "out") { 61 r ~= `_inprm.` ~ p.name ~ `= ` ~ p.name ~ `;`; 62 } 63 } 64 r ~= ` 65 struct Jrpc2Req { 66 string method; 67 long id; 68 _InParams params; 69 } 70 `; 71 r ~= `Jrpc2Req _req; _req.id = getJsonRPCID(); ` ~ `_req.method = _rpcName; 72 `; 73 r ~= `_req.params = _inprm;`; 74 75 76 /// REQUEST 77 r ~= ` 78 string respstr = ` ~ rpcSendReqFuncname ~ `(serializeToJson(_req), ` ~ to!string(timeout) ~ `);`; 79 80 /// out-param 81 r ~= ` 82 struct _OutParams {` ~ info.rettype ~ ` ret; ` ~ _outparams.map!(a => a ~ "; ").join ~ `}`; 83 r ~= ` 84 struct Jrpc2Responce { 85 string method; 86 long id; 87 int status; 88 string errinfo; 89 _OutParams result; 90 } 91 `; 92 93 /// Get RESPONCE and Parse. 94 r ~= ` 95 try { 96 Jrpc2Responce _resp = respstr.deserialize!Jrpc2Responce; 97 if (_resp.method != _rpcName) { 98 log(LogLevel.error, "Ooops! Json method not match! calling <", _rpcName, ">, 99 got (", _resp.method ,")."); 100 } 101 else { 102 _ret = _resp.result.ret;`; 103 104 // re-assign out storage type params 105 foreach(n; _outparam_names) { 106 r ~= " " ~ n ~ ` = _resp.result.` ~ n ~ ";"; 107 } 108 109 r ~= 110 ` 111 } 112 } catch (Exception e) { 113 log(LogLevel.critical, "Err: Json responce parse error !"); 114 }`; 115 116 r ~= ` 117 return _ret; 118 }`; 119 return r; 120 } 121 122 /// mixin method body using Interface meta-info. 123 string[] mixinFuncStr(string ifModuleName, MethodInfo[] ifInfos, string rpcSendReqFuncname, int timeout) { 124 string[] _mtdstrs = new string[ifInfos.length]; 125 import std.array : join; 126 127 foreach (i, MethodInfo _info; ifInfos) { 128 if (_info._customtypes.length > 0) { 129 _mtdstrs[i] = `import ` ~ ifModuleName ~ `: ` ~ _info._customtypes.join(", ") ~ `; 130 ` ~ _mixinSingleFunc(_info, rpcSendReqFuncname, timeout); 131 } 132 else { 133 _mtdstrs[i] = _mixinSingleFunc(_info, rpcSendReqFuncname, timeout); 134 } 135 } 136 137 return _mtdstrs; 138 } 139 140 141 private struct CustomTypes { 142 string[] list; 143 } 144 145 /// Auto Implement method for a given interface. 146 class ImplIfToRpc(I, int TimeoutMs) 147 { 148 import std.traits: moduleName; 149 150 /// TODO: "working" workaround for importing identifieres related to I. 151 /// need to find a more-elegant-way 152 enum _ifModuleName = moduleName!I; 153 154 /// interface meta info 155 enum _ifinfos = IfInfo!I().getInfos(); 156 157 158 /// mixin method body 159 enum _SerFunStr = mixinFuncStr(_ifModuleName, _ifinfos, "rpcRequest", TimeoutMs); 160 161 /// 162 this() { 163 164 } 165 166 string rpcRequest(string req, int timeout) { 167 return ""; 168 } 169 170 static foreach (_str; _SerFunStr) { 171 //pragma(msg, _str); 172 mixin(_str); 173 } 174 } 175 176 /// Thread RPC client 177 class ThrdRPClient(I, int T) : ImplIfToRpc!(I, T) { 178 import std.concurrency : Tid, send, receiveTimeout; 179 import core.thread: msecs; 180 181 private Tid _srvThrdId; 182 183 /// 184 this(Tid srvThrdId) { 185 this._srvThrdId = srvThrdId; 186 } 187 188 override string rpcRequest(string req, int timeout) { 189 this._srvThrdId.send(req); 190 string _resp; 191 const bool _received = receiveTimeout(msecs(timeout), (string x) { 192 _resp = x; 193 }); 194 195 if (_received) { 196 return _resp; 197 } 198 199 return ""; 200 } 201 } 202 203 /// Thread RPC client 204 class ThrdRPCTcpClient(I, int T) : ImplIfToRpc!(I, T) { 205 import std.socket : TcpSocket; 206 import core.thread: msecs; 207 import std.conv : to; 208 209 private TcpSocket _tcpsock; 210 private size_t _max_rcv_len; 211 212 /// 213 this(TcpSocket tcpsock, size_t max_rcv_len) { 214 this._tcpsock = tcpsock; 215 this._max_rcv_len = max_rcv_len; 216 } 217 218 override string rpcRequest(string req, int timeout) { 219 ubyte[] recv_data = new ubyte[this._max_rcv_len]; 220 size_t len; 221 string _resp = ""; 222 try { 223 version (LogRPCInfo) { 224 log("sending .. <", req, ">..."); 225 } 226 _tcpsock.send(req); 227 version (LogRPCInfo) { 228 log("waiting rpc server reply..."); 229 } 230 int tries = timeout / 100; 231 if (_tcpsock.blocking) { 232 len = _tcpsock.receive(recv_data); 233 } 234 else { 235 while (tries--) { 236 len = _tcpsock.receive(recv_data); 237 if (len > 0) 238 break; 239 } 240 if (tries <= 0) { 241 log("Request timed out!"); 242 } 243 } 244 245 if (len > 0) { 246 _resp = to!string(cast(char[])recv_data[0 .. len]); 247 version (LogRPCInfo) { 248 log("got rply : ", _resp); 249 } 250 } 251 } 252 catch (Exception e) { 253 log(e.msg); 254 } 255 256 return _resp; 257 } 258 }