]> xenbits.xen.org Git - xenclient/toolstack.git/commitdiff
[eventloop,async_conn] restore the higher-level API in a separate async_conn module.
authorPrashanth Mundkur <prashanth.mundkur@citrix.com>
Tue, 30 Jun 2009 19:48:41 +0000 (12:48 -0700)
committerPrashanth Mundkur <prashanth.mundkur@citrix.com>
Tue, 30 Jun 2009 21:37:54 +0000 (14:37 -0700)
common/async_conn.ml [new file with mode: 0644]
common/async_conn.mli [new file with mode: 0644]

diff --git a/common/async_conn.ml b/common/async_conn.ml
new file mode 100644 (file)
index 0000000..80157bc
--- /dev/null
@@ -0,0 +1,172 @@
+(*
+ * Copyright (C) 2009      Citrix Ltd.
+ * Author Prashanth Mundkur <firstname.lastname@citrix.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *)
+
+let verbose = ref false
+
+let dbg fmt =
+       let logger s = if !verbose then Printf.printf "%s\n%!" s in
+       Printf.ksprintf logger fmt
+
+type t =
+{
+       ev_loop : Eventloop.t;
+       ev_handle : Eventloop.handle;
+       ev_fd : Unix.file_descr;
+
+       mutable callbacks : callbacks;
+       mutable send_done_enabled : bool;
+       mutable recv_enabled : bool;
+
+       send_buf : Buffer.t;
+}
+
+and callbacks =
+{
+       connect_callback : t -> unit;
+       recv_callback : t -> string -> (* offset *) int -> (* length *) int -> unit;
+       send_done_callback : t -> unit;
+       shutdown_callback : t -> unit;
+       error_callback : t -> Eventloop.error -> unit;
+}
+
+let compare t1 t2 = compare t1.ev_handle t2.ev_handle
+let hash t = Eventloop.handle_hash t.ev_handle
+
+module Conns = Connection_table.Make(struct type conn = t end)
+
+let accept_callback el h fd addr =
+       failwith "Async_conn.accept_callback: invalid use"
+
+let connect_callback el h =
+       let conn = Conns.get_conn h in
+       conn.callbacks.connect_callback conn
+
+let recv_ready_callback el h fd =
+       let conn = Conns.get_conn h in
+       if conn.recv_enabled then
+               let buflen = 512 in
+               let buf = String.create buflen in
+               try
+                       let read_bytes = Unix.read fd buf 0 buflen in
+                       if read_bytes = 0 then
+                               conn.callbacks.shutdown_callback conn
+                       else begin
+                               dbg "<- %s" (String.sub buf 0 read_bytes);
+                               conn.callbacks.recv_callback conn buf 0 read_bytes
+                       end
+               with
+               | Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
+               | Unix.Unix_error (Unix.EAGAIN, _, _)
+               | Unix.Unix_error (Unix.EINTR, _, _) ->
+                       ()
+               | Unix.Unix_error (ec, f, s) ->
+                       conn.callbacks.error_callback conn (ec, f, s)
+       else
+               Eventloop.disable_recv conn.ev_loop conn.ev_handle
+
+let send_ready_callback el h fd =
+       let conn = Conns.get_conn h in
+       let payload = Buffer.contents conn.send_buf in
+       let payload_len = String.length payload in
+       (try
+               (match Unix.write fd payload 0 payload_len with
+                | 0 -> ()
+                | sent ->
+                       dbg "-> %s" (String.sub payload 0 sent);
+                       Buffer.clear conn.send_buf;
+                       Buffer.add_substring conn.send_buf payload sent (payload_len - sent)
+               );
+               
+       with
+       | Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
+       | Unix.Unix_error (Unix.EAGAIN, _, _)
+       | Unix.Unix_error (Unix.EINTR, _, _) ->
+               ()
+       | Unix.Unix_error (ec, f, s) ->
+               conn.callbacks.error_callback conn (ec, f, s)
+       );
+       (* We may need to invoke the send_done_callback, but we may
+          have dispatched an error_callback above.  So we need to ensure
+          the connection is still active.
+       *)
+       if Conns.has_conn h && Buffer.length conn.send_buf = 0 then begin
+               Eventloop.disable_send conn.ev_loop conn.ev_handle;
+               if conn.send_done_enabled then
+                       conn.callbacks.send_done_callback conn
+       end
+
+let error_callback el h err =
+       let conn = Conns.get_conn h in
+       conn.callbacks.error_callback conn err
+
+let conn_callbacks =
+{
+       Eventloop.accept_callback = accept_callback;
+       Eventloop.connect_callback = connect_callback;
+       Eventloop.error_callback = error_callback;
+       Eventloop.recv_ready_callback = recv_ready_callback;
+       Eventloop.send_ready_callback = send_ready_callback;
+}
+
+let attach ev_loop fd  ?(enable_send_done=false) ?(enable_recv=true) callbacks =
+       let ev_handle = Eventloop.register_conn ev_loop fd ~enable_send:false ~enable_recv conn_callbacks in
+       let conn = { ev_loop = ev_loop;
+                    ev_handle = ev_handle;
+                    ev_fd = fd;
+                    callbacks = callbacks;
+                    send_done_enabled = enable_send_done;
+                    recv_enabled = enable_recv;
+                    send_buf = Buffer.create 16;
+                  }
+       in
+       Conns.add_conn ev_handle conn;
+       conn
+
+let detach conn =
+       Eventloop.remove_conn conn.ev_loop conn.ev_handle;
+       Conns.remove_conn conn.ev_handle
+
+let close conn =
+       (* It might already be detached; ignore this case. *)
+       (try detach conn with _ -> ());
+       (try Unix.close conn.ev_fd with _ -> ())
+
+let enable_send_done conn =
+       conn.send_done_enabled <- true
+
+let disable_send_done conn =
+       conn.send_done_enabled <- false
+
+let enable_recv conn =
+       Eventloop.enable_recv conn.ev_loop conn.ev_handle
+
+let disable_recv conn =
+       Eventloop.disable_recv conn.ev_loop conn.ev_handle
+
+let connect conn addr =
+       Eventloop.connect conn.ev_loop conn.ev_handle addr
+
+let send conn s =
+       Buffer.add_string conn.send_buf s;
+       Eventloop.enable_send conn.ev_loop conn.ev_handle
+
+let has_pending_send conn =
+       Buffer.length conn.send_buf > 0
+
+let set_callbacks conn callbacks =
+       conn.callbacks <- callbacks
+
+let get_handle conn = conn.ev_handle
+let get_eventloop conn = conn.ev_loop
diff --git a/common/async_conn.mli b/common/async_conn.mli
new file mode 100644 (file)
index 0000000..dde9327
--- /dev/null
@@ -0,0 +1,54 @@
+(*
+ * Copyright (C) 2009      Citrix Ltd.
+ * Author Prashanth Mundkur <firstname.lastname@citrix.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *)
+
+(* Convenience asynchronous connection, for use with the Eventloop
+   module.
+
+   This module provides a more convenient API for the IO callbacks.
+*)
+
+type t
+val compare : t -> t -> int
+val hash : t -> int
+
+type callbacks =
+{
+       connect_callback : t -> unit;
+       recv_callback : t -> string -> (* offset *) int -> (* length *) int -> unit;
+       send_done_callback : t -> unit;
+       shutdown_callback : t -> unit;
+       error_callback : t -> Eventloop.error -> unit;
+}
+
+val attach : Eventloop.t -> Unix.file_descr -> ?enable_send_done:bool -> ?enable_recv:bool -> callbacks -> t
+val detach : t -> unit
+val close  : t -> unit
+
+val enable_send_done  : t -> unit
+val disable_send_done : t -> unit
+
+val enable_recv  : t -> unit
+val disable_recv : t -> unit
+
+val connect : t -> Unix.sockaddr -> unit
+
+val send : t -> string -> unit
+val has_pending_send : t -> bool
+
+val set_callbacks : t -> callbacks -> unit
+
+val get_handle : t -> Eventloop.handle
+val get_eventloop : t -> Eventloop.t
+