1 module trpc.client;
2 
3 import trpc.ifinfo;
4 import asdf;
5 import std.experimental.logger : log;
6 
7 /// TODO _rpc_id should be mutex protected.
8 __gshared long _rpc_id = 1; 
9 long getJsonRPCID() {
10 	return _rpc_id++;
11 }
12 
13 /// mixin a single method, seprate this makes code a bit more clear
14 /// rpcSendReqFuncname is:  string funcname (string req, int timeout);
15 string _mixinSingleFunc(MethodInfo info, string rpcSendReqFuncname, int timeout) {
16 	import std.array : join;
17 	import std.conv : to;
18 	/// function signature
19 	string r = "";
20 	r ~= `//// RPC-Method [` ~ info.name ~ `]
21 	` ~ info.rettype ~ " " ~ info.name ~ "(";
22 	// params
23 	string[] _params = new string[info.params.length];
24 	foreach (i, p; info.params) {
25 		if ((p.storetype == "out") || (p.storetype == "ref")) {
26 			_params[i] ~= p.storetype ~ " " ~ p.type ~ " " ~ p.name;
27 		}
28 		else {
29 			_params[i] ~= p.type ~ " " ~ p.name;
30 		}
31 	}
32 	r ~= _params.join(", ");
33 	r ~=  `) {
34 		import std.experimental.logger : log, LogLevel;
35 		immutable _rpcName = "` ~ info.name ~ `";
36 		` ~ info.rettype ~ ` _ret;`;
37 	// function scope erea.
38 	///
39 	/// now serialize all params and function name, return to a rpc call string or blob.
40 	///
41 	import std.algorithm : map;
42 	import std.conv: to;
43 	import std.array : array;
44 	string[] _outparams; string[] _inparams; string[] _outparam_names;
45 	foreach (i, p; info.params) {
46 		if (p.storetype == "out") {
47 			_outparams ~= p.type ~ " " ~ p.name;
48 			_outparam_names ~= p.name;
49 		}
50 		else {
51 			_inparams ~= p.type ~ " " ~ p.name;
52 		}
53 	}
54 	// in params outline
55 	r ~= ` struct _InParams {` ~  _inparams.map!(a => a ~ "; ").join ~ `}`;
56 	r ~= `
57 		_InParams _inprm;
58 	`;
59 	foreach (i, p; info.params) {
60 		if (p.storetype != "out") {
61 			r ~= `_inprm.` ~ p.name ~ `= ` ~ p.name ~ `;`;
62 		}
63 	}
64 	r ~= `
65 		struct Jrpc2Req {
66 			string method;
67 			long id;
68 			_InParams params;
69 		}
70 	`;
71 	r ~= `Jrpc2Req _req; _req.id = getJsonRPCID(); ` ~ `_req.method = _rpcName;
72 	`;
73 	r ~= `_req.params = _inprm;`;
74 
75 
76 	/// REQUEST
77 	r ~= `
78 		string respstr = ` ~ rpcSendReqFuncname ~ `(serializeToJson(_req), ` ~ to!string(timeout) ~ `);`;
79 
80 	/// out-param
81 	r ~= `
82 	struct _OutParams {` ~ info.rettype ~ ` ret; ` ~ _outparams.map!(a => a ~ "; ").join ~ `}`;
83 	r ~= `
84 		struct Jrpc2Responce {
85 			string method;
86 			long id;
87 			int status;
88 			string errinfo;
89 			_OutParams result;
90 		}
91 	`;
92 
93 	/// Get RESPONCE and Parse.
94 	r ~= `
95 		try {
96 			Jrpc2Responce _resp = respstr.deserialize!Jrpc2Responce;
97 			if (_resp.method != _rpcName) {
98 				log(LogLevel.error, "Ooops! Json method not match! calling <", _rpcName, ">, 
99 					got (", _resp.method ,").");
100 			}
101 			else {
102 				_ret = _resp.result.ret;`;
103 
104 	// re-assign out storage type params
105 	foreach(n; _outparam_names) {
106 		r ~= " " ~ n ~ ` = _resp.result.` ~ n ~ ";";
107 	}
108 
109 	r ~= 
110 	` 
111 			}
112 		} catch (Exception e) {
113 			log(LogLevel.critical, "Err: Json responce parse error !");
114 		}`;
115 
116 	r ~=  `
117 	return _ret;
118 	}`;
119 	return r;
120 }
121 
122 /// mixin method body using Interface meta-info.
123 string[] mixinFuncStr(string ifModuleName, MethodInfo[] ifInfos, string rpcSendReqFuncname, int timeout) {
124     string[] _mtdstrs = new string[ifInfos.length];
125     import std.array : join;
126     
127     foreach (i, MethodInfo _info; ifInfos) {
128         if (_info._customtypes.length > 0) {
129             _mtdstrs[i] = `import ` ~ ifModuleName ~ `: ` ~ _info._customtypes.join(", ") ~ `;
130         ` ~ _mixinSingleFunc(_info, rpcSendReqFuncname, timeout);
131         }
132         else {
133             _mtdstrs[i] = _mixinSingleFunc(_info, rpcSendReqFuncname, timeout);
134         }
135     }
136 
137     return _mtdstrs;
138 }
139 
140 
141 private struct CustomTypes {
142     string[] list;
143 }
144 
145 /// Auto Implement method for a given interface.
146 class ImplIfToRpc(I, int TimeoutMs) 
147 	{
148     import std.traits: moduleName;
149 
150     /// TODO: "working" workaround for importing identifieres related to I.
151     ///       need to find a more-elegant-way
152     enum _ifModuleName = moduleName!I;
153 
154     /// interface meta info
155     enum _ifinfos = IfInfo!I().getInfos();
156 
157 
158     /// mixin method body
159     enum _SerFunStr = mixinFuncStr(_ifModuleName, _ifinfos, "rpcRequest", TimeoutMs);
160 
161     ///
162     this() {
163         
164     }
165 
166 	string rpcRequest(string req, int timeout) {
167 		return "";
168 	}
169 
170     static foreach (_str; _SerFunStr) {
171         //pragma(msg, _str);
172         mixin(_str);
173     }
174 }
175 
176 /// Thread RPC client
177 class ThrdRPClient(I, int T) : ImplIfToRpc!(I, T) {
178 	import std.concurrency : Tid, send, receiveTimeout;
179 	import core.thread: msecs;
180 
181 	private Tid _srvThrdId;
182 
183 	/// 
184 	this(Tid srvThrdId) {
185 		this._srvThrdId = srvThrdId;
186 	}
187 
188 	override string rpcRequest(string req, int timeout) {
189 		this._srvThrdId.send(req);
190 		string _resp;
191 		const bool _received = receiveTimeout(msecs(timeout), (string x) {
192 			_resp = x;
193 		});
194 
195 		if (_received) {
196 			return _resp;
197 		}
198 
199 		return "";
200 	}
201 }
202 
203 /// Thread RPC client
204 class ThrdRPCTcpClient(I, int T) : ImplIfToRpc!(I, T) {
205 	import std.socket : TcpSocket;
206 	import core.thread: msecs;
207 	import std.conv : to;
208 
209 	private TcpSocket _tcpsock;
210 	private size_t _max_rcv_len;
211 
212 	/// 
213 	this(TcpSocket tcpsock, size_t max_rcv_len) {
214 		this._tcpsock = tcpsock;
215 		this._max_rcv_len = max_rcv_len;
216 	}
217 
218 	override string rpcRequest(string req, int timeout) {
219 		ubyte[] recv_data = new ubyte[this._max_rcv_len];
220 		size_t len;
221 		string _resp = "";
222 		try {
223 			version (LogRPCInfo) {
224 				log("sending .. <", req, ">...");
225 			}
226 			_tcpsock.send(req);
227 			version (LogRPCInfo) {
228 				log("waiting rpc server reply...");
229 			}
230 			int tries = timeout / 100;
231 			if (_tcpsock.blocking) {
232 				len = _tcpsock.receive(recv_data);
233 			}
234 			else {
235 				while (tries--) {
236 					len = _tcpsock.receive(recv_data);
237 					if (len > 0)
238 						break;
239 				}
240 				if (tries <= 0) {
241 					log("Request timed out!");
242 				}
243 			}
244 
245 			if (len > 0) {
246 				_resp = to!string(cast(char[])recv_data[0 .. len]);
247 				version (LogRPCInfo) {
248 					log("got rply : ", _resp);
249 				}
250 			}
251 		}
252 		catch (Exception e) {
253 			log(e.msg);
254 		}
255 		
256 		return _resp;
257 	}
258 }