1 module trpc.server;
2 
3 import trpc.ifinfo;
4 import asdf;
5 import std.json;
6 
7 import std.experimental.logger : log, LogLevel;
8 
9 private string _mixinDispatchCases(MethodInfo[] ifInfos) {
10 	string s = "";
11 	foreach (m; ifInfos) {
12 		s ~= `
13 			case "` ~ m.name ~ `":` ~ `
14 				ret = _disp_` ~ m.name ~ `(params, resp);
15 				break;`;
16 	}
17 
18 	return s ~ `
19 			default:  ret = -1;`;
20 }
21 
22 private struct ParamS {
23 	string type;
24 	string name;
25 }
26 
27 private string _mixinDispatchMethod(MethodInfo info, string srvclass_name) {
28 	import std.array : join;
29 	ParamS[] _inParams, _outParams;
30 	string r;
31 	r ~= `//// RPC-Srv-Method [` ~ info.name ~ `]
32 	` ~ "int _disp_" ~ info.name ~ ` (JSONValue params, out string resp) {
33 		bool _parse_ok = false;
34 		`;
35 
36 	foreach (i, p; info.params) {
37 		if (p.storetype == "out") {
38 			_outParams ~= ParamS(p.type, p.name);
39 		}
40 		else {
41 			_inParams ~= ParamS(p.type, p.name);
42 		}
43 	}
44 	// in-param
45 	r ~= `struct _InParams { `;
46 	foreach (p; _inParams) {
47 		r ~= p.type ~ " " ~ p.name ~ "; ";
48 	}
49 	r ~= `}`;
50 	// out-param
51 	r ~= ` struct _OutParams {`;
52 	foreach (p; _outParams) {
53 		r ~= p.type ~ " " ~ p.name ~ "; ";
54 	}
55 	r ~= info.rettype ~ ` ret; } _OutParams _outp;`;
56 
57 	// deserial-in-param
58 	r ~= ` _InParams _inp; ` ~ info.rettype ~ ` ret; 
59 		try {
60 			_inp = deserialize!_InParams(params.toString);`;
61 
62 	r ~= `
63 			ret = ` ~ srvclass_name ~ `.` ~ info.name ~ `(`;
64 	string [] _pnames;
65 	foreach (ap; info.params) {
66 		if ((ap.storetype == "none") || (ap.storetype == "ref"))
67 			_pnames ~= `_inp.` ~ ap.name;
68 		if (ap.storetype == "out")
69 			_pnames ~= `_outp.` ~ ap.name;
70 	}
71 	r ~= _pnames.join(", ") ~ `);`;
72 
73 	// out param assignment.
74 	r ~= `
75 			_outp.ret = ret;`;
76 
77 	r ~= `
78 			_parse_ok = true;
79 		} catch (Exception e) {
80 			_parse_ok = false;
81 			log("Error, deserialize In-Param of method error!");
82 		}
83 		`;
84 
85 	//
86 	r ~= `
87 		if (_parse_ok) {
88 			try {
89 				resp = serializeToJson(_outp);
90 				version (LogRPCInfo) {
91 					log("Responce json is : ", resp);
92 				}
93 			}
94 			catch (Exception e) {
95 				_parse_ok = false;
96 				log(e.msg);
97 			}
98 		}
99 
100 		if (_parse_ok) {
101 			return 0;
102 		}
103 
104 		return -1;
105 	}`;
106 
107 	return r;
108 }
109 
110 ///
111 class ImplRpcSrvOfIf(C) {
112 	
113 	import std.traits : moduleName;
114     import std.array : join;
115 
116     /// TODO: "working" workaround for importing identifieres related to I.
117     ///       need to find a more-elegant-way
118     enum _ifModuleName = moduleName!C;
119 
120     /// interface meta info
121     enum _minfos = IfInfo!C().getInfos();
122 
123 	//pragma(msg, "_ifinfos of Class: ", _minfos);
124 
125 	private  C _srvclass;
126 
127 	///
128 	enum _srvclass_name = __traits(identifier, _srvclass);
129 
130 	///
131 	this(ref C srvclass) {
132 		_srvclass = srvclass;
133 	}
134 
135 	/// mixin dispatch method.
136 	static foreach (m; _minfos) {
137 		//pragma(msg, _mixinDispatchMethod(m, _srvclass_name));
138         static if (m._customtypes.length > 0) {
139             mixin(`import ` ~ _ifModuleName ~ `: ` ~ m._customtypes.join(", ") ~ `;
140                 ` ~ _mixinDispatchMethod(m, _srvclass_name));
141         }
142         else {
143             mixin(_mixinDispatchMethod(m, _srvclass_name));
144         }
145 	}
146 
147 	private int _dispatch(string method, JSONValue params, out string resp) {
148 		mixin(`int ret = 0;`);
149 		immutable string _case = _mixinDispatchCases(_minfos);
150 		//pragma(msg, _case);
151 		switch (method) {
152 			mixin(_case);
153 		}
154 		return ret;
155 	}
156 
157 	/// execute rpc request
158 	int executeMethod(string req, out string resp) {
159 		version (LogRPCInfo) {
160 			log("req is ", req);
161 		}
162 		long id; string method; JSONValue params;
163 		JSONValue reqjson; string respjs;
164 		int ret = 0;
165 		try {
166 			reqjson = parseJSON(req);
167 			id = reqjson["id"].integer;
168 			method = reqjson["method"].str;
169 			params = reqjson["params"].object;
170 			ret = this._dispatch(method, params, respjs);
171 		}
172 		catch (Exception e) {
173 			log(e.msg);
174 		}
175 
176 		JSONValue respjobj;
177 		try {
178 			respjobj["method"] = JSONValue(method);
179 			respjobj["id"] = JSONValue(id);
180 			respjobj["status"] = JSONValue(ret);
181 		}
182 		catch (Exception e) {
183 			log(e.msg);
184 		}
185 		
186 		respjobj["result"] = parseJSON(respjs);
187 		if (ret == 0) {
188 			respjobj["errinfo"] = "";
189 		}
190 		else {
191 			respjobj["errinfo"] = "Json rpc request failed!";
192 		}
193 
194 		resp = respjobj.toString;
195 		version (LogRPCInfo) {
196 			log("Method retured Json-Str: ", resp);
197 		}
198 		return ret;
199 	}
200 }
201 
202 import std.concurrency : spawn, Tid, receiveOnly, send, ownerTid;
203 
204 /// Thread-RPC method excute and responce deals
205 void thrdRPCSrvProcess (C) (ImplRpcSrvOfIf!C srvcls) {
206     import std.concurrency : Tid, receiveOnly, send, ownerTid;
207     auto recv = receiveOnly!string();
208     string resp;
209     srvcls.executeMethod(recv, resp);
210     ownerTid.send(resp);
211 }
212 
213 /// Thread-RPC method excute and responce deals - None-block
214 void thrdRPCSrvProcessNoneBlock (C) (ImplRpcSrvOfIf!C srvcls, int timeout_ms) {
215     import std.concurrency : Tid, receiveTimeout, send, ownerTid;
216     import core.thread : msecs;
217     string recv, resp;
218     const bool _received = receiveTimeout(msecs(timeout_ms), (string req) {
219                                                         recv = req;
220                                                         });
221     if (_received) {
222         srvcls.executeMethod(recv, resp);
223         ownerTid.send(resp);
224     }
225 }
226 
227 /// simple standard Thread RPC server
228 mixin template thrdRPCSrvThreadTemplate(alias C, alias CtorCall) {
229     void thrdRPCSrvThread() {
230         import core.thread : msecs, Thread;
231         auto _srvimpl = CtorCall;
232         auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl);
233 
234         while (true) {
235             thrdRPCSrvProcess!C(srvcls);
236             Thread.sleep(msecs(100));
237         }
238     }
239 }
240 
241 import std.socket : TcpSocket, Socket, SocketSet, SocketException;
242 
243 /// TCP handler for RPC server
244 struct TcpHdl {
245     TcpSocket tcpsock;  		/// TCP socket
246     SocketSet sockset;  		/// socket set
247     int _maxconn;       		/// max of TCP connections
248 	int _maxrecv;       		/// max receive size
249 	bool close_after_resp;		/// only alow one connection 
250 }
251 
252 /// simple standard Thread RPC server
253 mixin template thrdRPCTcpSrvTemplate(alias C, alias CtorCall) {
254     import std.socket : Socket, SocketSet, SocketException;
255 	import std.experimental.logger : log, LogLevel;
256     void thrdRPCTCPSrv(TcpHdl * tcphdl) {
257         import core.thread : msecs, Thread;
258         import std.conv : to;
259         import std.algorithm : remove;
260         auto _srvimpl = CtorCall;
261         auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl);
262 
263         Socket[] reads;
264         char[] buf = new char[tcphdl._maxrecv];
265         string _req, _resp;
266 
267 		void closeClientSocketConn(size_t i, ref Socket sock) {
268 			try {
269 				version (LogRPCInfo) {
270 					log("Client connection from port ", sock.remoteAddress().toString(), 
271 						" was closed or in Exception, remove it!");
272 				}
273 			}
274 			catch (Exception e) {
275 
276 			}
277 			
278 			sock.close();
279 			reads = reads.remove(i);
280 		}
281 
282 		void readAndProcess() {
283             foreach (i, r; reads) {
284                 if (tcphdl.sockset.isSet(r)) {
285                     auto _len = r.receive(buf);
286                     if (_len == Socket.ERROR) {
287                         closeClientSocketConn(i, r);
288 						continue;
289                     }
290                     else if (_len > 0) {
291 						version (LogRPCInfo) {
292 							log("TCP-RPC server got cmd");
293 						}
294                         try {
295                             _req = to!string(buf[0 .. _len]);
296                             srvcls.executeMethod(_req, _resp);
297                             r.send(_resp);
298                         }
299                         catch (Exception e) {
300                             log(e.msg);
301                             continue;
302                         }
303                     }
304                     else { // receive returned 0 (EOF) means client just closed.
305                         closeClientSocketConn(i, r);
306 						continue;
307                     }
308 
309 					if (tcphdl.close_after_resp) {
310 						r.close();
311 						reads = reads.remove(i);
312 						version (LogRPCInfo) {
313 							log("TCP total connection : ", reads.length);
314 						}
315 					}
316                 }
317 
318 				if (!r.isAlive) {
319 					closeClientSocketConn(i, r);
320 					continue;
321 				}
322             }
323         }
324 
325         void connCheck() {
326 			if (reads.length < tcphdl._maxconn) {
327 				if (tcphdl.sockset.isSet(tcphdl.tcpsock)) {
328 					Socket sc = null;
329 					try {
330 						sc = tcphdl.tcpsock.accept();
331 					}
332 					catch (Exception e) {
333 						log(LogLevel.critical, e.msg);
334 						return;
335 					}
336 
337 					reads ~= sc;
338 
339 					scope (failure) {
340 						log(LogLevel.critical, "Error accepting");
341 						if (sc)
342 							sc.close();
343 					}
344 				}
345 			}
346             else {
347 				log(LogLevel.critical, "ERRRRR! can not open more tcp connections!");
348 			}
349         }
350 
351         while (true) {
352             tcphdl.sockset.add(tcphdl.tcpsock);
353             foreach (sock; reads) {
354                 tcphdl.sockset.add(sock);
355             }
356             Socket.select(tcphdl.sockset, null, null);
357 			readAndProcess();
358 			connCheck();
359 			// must reset sockset, otherwize, we can't accept on new conn.
360 			tcphdl.sockset.reset();
361         }
362     }
363 }