1 module rpcserver;
2 
3 import ifinfo;
4 import asdf;
5 import std.json;
6 
7 import std.experimental.logger : log, LogLevel;
8 
9 private string _mixinDispatchCases(MethodInfo[] ifInfos) {
10 	string s = "";
11 	foreach (m; ifInfos) {
12 		s ~= `
13 			case "` ~ m.name ~ `":` ~ `
14 				ret = _disp_` ~ m.name ~ `(params, resp);
15 				break;`;
16 	}
17 
18 	return s ~ `
19 			default:  ret = -1;`;
20 }
21 
22 private struct ParamS {
23 	string type;
24 	string name;
25 }
26 
27 private string _mixinDispatchMethod(MethodInfo info, string srvclass_name) {
28 	import std.array : join;
29 	ParamS[] _inParams, _outParams;
30 	string r;
31 	r ~= `//// RPC-Srv-Method [` ~ info.name ~ `]
32 	` ~ "int _disp_" ~ info.name ~ ` (JSONValue params, out string resp) {
33 		bool _parse_ok = false;
34 		`;
35 
36 	foreach (i, p; info.params) {
37 		if (p.storetype == "out") {
38 			_outParams ~= ParamS(p.type, p.name);
39 		}
40 		else {
41 			_inParams ~= ParamS(p.type, p.name);
42 		}
43 	}
44 	// in-param
45 	r ~= `struct _InParams { `;
46 	foreach (p; _inParams) {
47 		r ~= p.type ~ " " ~ p.name ~ "; ";
48 	}
49 	r ~= `}`;
50 	// out-param
51 	r ~= ` struct _OutParams {`;
52 	foreach (p; _outParams) {
53 		r ~= p.type ~ " " ~ p.name ~ "; ";
54 	}
55 	r ~= info.rettype ~ ` ret; } _OutParams _outp;`;
56 
57 	// deserial-in-param
58 	r ~= ` _InParams _inp; ` ~ info.rettype ~ ` ret; 
59 		try {
60 			_inp = deserialize!_InParams(params.toString);`;
61 
62 	r ~= `
63 			ret = ` ~ srvclass_name ~ `.` ~ info.name ~ `(`;
64 	string [] _pnames;
65 	foreach (ap; info.params) {
66 		if ((ap.storetype == "none") || (ap.storetype == "ref"))
67 			_pnames ~= `_inp.` ~ ap.name;
68 		if (ap.storetype == "out")
69 			_pnames ~= `_outp.` ~ ap.name;
70 	}
71 	r ~= _pnames.join(", ") ~ `);`;
72 
73 	// out param assignment.
74 	r ~= `
75 			_outp.ret = ret;`;
76 
77 	r ~= `
78 			_parse_ok = true;
79 		} catch (Exception e) {
80 			_parse_ok = false;
81 			log("Error, deserialize In-Param of method error!");
82 		}
83 		`;
84 
85 	//
86 	r ~= `
87 		if (_parse_ok) {
88 			try {
89 				resp = serializeToJson(_outp);
90 				log("Responce json is : ", resp);
91 			}
92 			catch (Exception e) {
93 				_parse_ok = false;
94 				log(e.msg);
95 			}
96 		}
97 
98 		if (_parse_ok) {
99 			return 0;
100 		}
101 
102 		return -1;
103 	}`;
104 
105 	return r;
106 }
107 
108 ///
109 class ImplRpcSrvOfIf(C) {
110 	
111 	import std.traits : moduleName;
112     import std.array : join;
113 
114     /// TODO: "working" workaround for importing identifieres related to I.
115     ///       need to find a more-elegant-way
116     enum _ifModuleName = moduleName!C;
117 
118     /// interface meta info
119     enum _minfos = IfInfo!C().getInfos();
120 
121 	//pragma(msg, "_ifinfos of Class: ", _minfos);
122 
123 	private  C _srvclass;
124 
125 	///
126 	enum _srvclass_name = __traits(identifier, _srvclass);
127 
128 	///
129 	this(ref C srvclass) {
130 		_srvclass = srvclass;
131 	}
132 
133 	/// mixin dispatch method.
134 	static foreach (m; _minfos) {
135 		//pragma(msg, _mixinDispatchMethod(m, _srvclass_name));
136         static if (m._customtypes.length > 0) {
137             mixin(`import ` ~ _ifModuleName ~ `: ` ~ m._customtypes.join(", ") ~ `;
138                 ` ~ _mixinDispatchMethod(m, _srvclass_name));
139         }
140         else {
141             mixin(_mixinDispatchMethod(m, _srvclass_name));
142         }
143 	}
144 
145 	private int _dispatch(string method, JSONValue params, out string resp) {
146 		mixin(`int ret = 0;`);
147 		immutable string _case = _mixinDispatchCases(_minfos);
148 		//pragma(msg, _case);
149 		switch (method) {
150 			mixin(_case);
151 		}
152 		return ret;
153 	}
154 
155 	/// execute rpc request
156 	int executeMethod(string req, out string resp) {
157 		log("req is ", req);
158 		long id; string method; JSONValue params;
159 		JSONValue reqjson; string respjs;
160 		int ret = 0;
161 		try {
162 			reqjson = parseJSON(req);
163 			id = reqjson["id"].integer;
164 			method = reqjson["method"].str;
165 			params = reqjson["params"].object;
166 			ret = this._dispatch(method, params, respjs);
167 		}
168 		catch (Exception e) {
169 			log(e.msg);
170 		}
171 
172 		JSONValue respjobj;
173 		try {
174 			respjobj["method"] = JSONValue(method);
175 			respjobj["id"] = JSONValue(id);
176 			respjobj["status"] = JSONValue(ret);
177 		}
178 		catch (Exception e) {
179 			log(e.msg);
180 		}
181 		
182 		respjobj["result"] = parseJSON(respjs);
183 		if (ret == 0) {
184 			respjobj["errinfo"] = "";
185 		}
186 		else {
187 			respjobj["errinfo"] = "Json rpc request failed!";
188 		}
189 
190 		resp = respjobj.toString;
191 		log("Method retured Json-Str: ", resp);
192 		return ret;
193 	}
194 }
195 
196 import std.concurrency : spawn, Tid, receiveOnly, send, ownerTid;
197 
198 /// Thread-RPC method excute and responce deals
199 void thrdRPCSrvProcess (C) (ImplRpcSrvOfIf!C srvcls) {
200     import std.concurrency : Tid, receiveOnly, send, ownerTid;
201     auto recv = receiveOnly!string();
202     string resp;
203     srvcls.executeMethod(recv, resp);
204     ownerTid.send(resp);
205 }
206 
207 /// Thread-RPC method excute and responce deals - None-block
208 void thrdRPCSrvProcessNoneBlock (C) (ImplRpcSrvOfIf!C srvcls, int timeout_ms) {
209     import std.concurrency : Tid, receiveTimeout, send, ownerTid;
210     import core.thread : msecs;
211     string recv, resp;
212     const bool _received = receiveTimeout(msecs(timeout_ms), (string req) {
213                                                         recv = req;
214                                                         });
215     if (_received) {
216         srvcls.executeMethod(recv, resp);
217         ownerTid.send(resp);
218     }
219 }
220 
221 /// simple standard Thread RPC server
222 mixin template thrdRPCSrvThreadTemplate(alias C, alias CtorCall) {
223     void thrdRPCSrvThread() {
224         import core.thread : msecs, Thread;
225         auto _srvimpl = CtorCall;
226         auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl);
227 
228         while (true) {
229             thrdRPCSrvProcess!C(srvcls);
230             Thread.sleep(msecs(100));
231         }
232     }
233 }
234 
235 import std.socket : TcpSocket, Socket, SocketSet, SocketException;
236 
237 /// TCP handler for RPC server
238 struct TcpHdl {
239     TcpSocket tcpsock;  		/// TCP socket
240     SocketSet sockset;  		/// socket set
241     int _maxconn;       		/// max of TCP connections
242 	int _maxrecv;       		/// max receive size
243 	bool close_after_resp;		/// only alow one connection 
244 }
245 
246 /// simple standard Thread RPC server
247 mixin template thrdRPCTcpSrvTemplate(alias C, alias CtorCall) {
248     import std.socket : Socket, SocketSet, SocketException;
249     void thrdRPCTCPSrv(TcpHdl * tcphdl) {
250         import core.thread : msecs, Thread;
251         import std.conv : to;
252         auto _srvimpl = CtorCall;
253         auto srvcls = new ImplRpcSrvOfIf!C(_srvimpl);
254 
255         Socket[] reads;
256         char[] buf = new char[tcphdl._maxrecv];
257         string _req, _resp;
258 
259 		void readAndProcess() {
260             foreach (i, r; reads) {
261                 if (tcphdl.sockset.isSet(r)) {
262                     auto _len = r.receive(buf);
263                     if (_len == Socket.ERROR) {
264 
265                     }
266                     else if (_len > 0) {
267 						log("TCP-RPC server got cmd");
268                         try {
269                             _req = to!string(buf[0 .. _len]);
270                             srvcls.executeMethod(_req, _resp);
271                             r.send(_resp);
272                         }
273                         catch (Exception e) {
274                             log(e.msg);
275                             return;
276                         }
277                     }
278                     else {
279                         try {
280 
281                         }
282                         catch (SocketException) {
283                             log("Socket connection closed.");
284                         }
285                     }
286 					if (tcphdl.close_after_resp) {
287 						r.close();
288 						import std.algorithm : remove;
289 						reads = reads.remove(i);
290 						log("TCP total connection : ", reads.length);
291 					}
292                 }
293             }
294         }
295 
296         void connCheck() {
297             if (tcphdl.sockset.isSet(tcphdl.tcpsock)) {
298                 Socket sc = null;
299                 sc = tcphdl.tcpsock.accept();
300                 if (reads.length < tcphdl._maxconn) {
301                     reads ~= sc;
302                 }
303                 else {
304                     sc.close();
305                 }
306                 scope (failure) {
307                     log(LogLevel.critical, "Error accepting");
308                     if (sc)
309                         sc.close();
310                 }
311             }
312         }
313 
314         while (true) {
315             tcphdl.sockset.add(tcphdl.tcpsock);
316             foreach (sock; reads) {
317                 tcphdl.sockset.add(sock);
318             }
319             Socket.select(tcphdl.sockset, null, null);
320 			readAndProcess();
321 			connCheck();
322 			if (tcphdl.close_after_resp) {
323 				tcphdl.sockset.reset();
324 			}
325         }
326     }
327 }