Building a Server Engine. Author: Maarten Koopmans Date: 21-Mar-2002 Resources: Rugby distribution =toc === Building a server engine If you want to start using REBOL for server applications, you may consider building a server engine for all network communications. In this article I'll describe the HIgh Performance Engine of Rugby, hipe. Hipe provides you with a TCP and UDP based server framework, and a simple form of threading. What kind of features do we expect from a server framework? *TCP and UDP capable, as these are the dominant protocols of the Internet. *Easy addition of protocol handlers built on top of hipe. *Good performance. *Some simple form of threading. This is useful to start little tasks while we are in the network event loop. So let's start our script with REBOL [] hipe-serv: make object! [ As you can see we put the thing inside an object! to prevent global namespace pollution. Now let's think some more about the features we want to have, and how we are going to implement this in terms of data we need to keep. First UDP, which is really simple in REBOL. You wait on an UDP port and if it has an event you copy the data from the UDP datagram. In our case we just pass the data on to an associated handler function. A server process in TCP typically goes like this: a server port is listening, a client connects and the server port accepts. Accepting on the server side results in a dedicated port for the client on the server side as well. All traffic on this port can then be 'handled'. So it might be convenient to assign a function to every server port that act as a handler for the protocol (or better stated: IS the protocol). Of course the client port on the server may want to store some data as well, so putting the client port and its belonging in a dynamically created object is handy. === Our data. ;Our list of server ports server-ports: copy [] These are our server ports that listen for incoming connections. We store them for later reference while processing network events. ;Our server to handler mapping server-map: copy [] If we give every server port a handler for its protocol, its handy to store them in a block as well. Now we can use select to get the function! value of the appropiate handler later on. ; The list of ports we wait/all for in our main loop port-q: copy [] Of course we need to store all accepted client ports as well. Again for IO processing, and whatever comes to mind. But... ; Mapping of ports to objects containg additional info object-q: copy [] we also wanted to add data bound to the accepted client port! Hence we put the accepted client port in a block, followed by an object! that holds its related data. So we can use select to.... (yes, a pattern is emerging) ; Restricted server list restricted-server: make block! 20 ; Server restrictions? restrict: no The boolean restrict field indicates that we may not allow access to services in this engine to anyone. If it is set to "yes" , restricted-server is a block of ip addresses that we allow access from. ;The thread queue threads: copy [] current-thread: none And these are the things we will need for our threading engine: a block that holds the threads and current-thread as a 'pointer' to the current executing code. conn-timeout: 0:0:30 This is a connection timeout. If a connection has no activity for this specified amount of time, it is dropped. This prevents a server from having lost of open ports that don't do a thing. max-thread-waiting: 100 max-thread-waiting is a variable we will use to balance network traffic and the threading engine. It works like this: we process network events always and negelect threads unless there are more than max-thread-waiting. In that case we process threads as well. The effect is that setting this number low gives the threading engine more time, and setting it high gives the networking part the edge. === Restricting access We use two functions for restricting access and validating. The first is restrict-to that simply sets our restrict variable to yes and appends a block of IP addresses to our restricted-server variable: restrict-to: func [ {Sets server restrictions. The server will only serve to machines with the IP-addresses found in the list.} r [any-block!] {List of IP-addresses to serve.} ][ restrict: yes append restricted-server r ] And then we need a function to check that given a certain IP number, a connection is allowed: allow?: func [ {Checks if a connection to the specified IP-address is allowed.} ip [tuple!] {IP-address to check.} ][ return found? find restricted-server ip ] The above function can thus be used when deciding whether or not to accept a connection. === Initializing a server We need to be able to add server ports, associate them with handlers and look them up. Finding out if a given port (which has an event) is a server port is done by the is-server? function. It simply looks up whether or not the port is in the server-ports block. is-server?: func [ {Check to see whether a given port is a server port.} p [port!] ][ return found? find server-ports p ] This implies that we need to add the server port there as well. And not only that, we need to associate it with a handler function! So add-server-port stores the port in the server-ports block, and stores an association between the port and the handler in the server-map. add-server-port: func [ {Adds a server port and its handler to the list and the map} p [port!] handler [any-function!] ][ append server-ports p append server-map p append server-map :handler return ] And remove-server-port does, of course, the opposite: remove-server-port: func [ {Removes a server from our list and map} ][ remove find server-ports p remove remove find server-map p return ] Now let's put it all together in a function init-server-port, that not only stores all information using the above functions, but also opens the port and increases the backlog. The backlog is the number of connections that are not accepted yet by you but can be waiting for you. Default is in the order of 5. We open everything in no-wait mode by default, so you can write non-blocking handlers. You can always use set-modes to change this behaviour later on. init-server-port: func [ {Initializes our main server port.} p [port! url!] conn-handler [any-function!] /local dest ][ either url? p [dest: make port! p][dest: p] add-server-port dest :conn-handler append port-q dest ; Increase the backlog for this server. 15 should be possible (default ; is 5) ;REMOVE this for compatibility (o.a. Mac) or set it to 5 or so. p/backlog: 15 open/no-wait dest ] === The threading engine First we need to decide on the layout of a thread. We choose a very simple model, where a thread is just an object. The object may have all kind of fields, and some of the fields are just blocks that will be executed using 'do. In order to give the threading engine a hint as for which code to execute, a thread object will always have one field called code-pointer. A code-pointer is just that: it contains a lit-word! value that is the word of the next code block to 'do. So the threading engine simple does a do get/any in some-thread some-thread/code-pointer What's cool about this is that you can use the code-pointer field to control the flow in a thread upon multiple invocations. Note that a thread is itself responsible for handling control back, a threading model commonly referred to as cooperative multithreading. We will also add a special field, clean-up, which if present contains code that cleans up after a thread. For example to free resources such as files, network ports, database connection or.... Now let's add some functions. First two functions to add and remove threads (or objects): add-thread: func [o [object!] {The thread to add}][ append hipe-serv/threads o o ] remove-thread: func [o [object!] {The task to remove}][ remove find head hipe-serv/threads o ] And of course, or very simple threading engine. It works very simple. Upon invocation the function process-thread just picks the current-thread and tries to do its code-pointer field. If the code-pointer fails (an error has occured) the special field 'clean-up is fetched from the threading object. The code in clean-up can then free any additional resources, and then the trhead is removed. If the code-pointer was set to clean-up by the thread itself, the code is executed and removed from the blocks. process-thread: func [/local do-thread][ do-thread: none ;Premature return if empty? hipe-serv/threads [return] ;Are we initialized (cumbersome, but yes) if none? current-thread [current-thread: head hipe-serv/threads] ;Are we at the end of the queue ;Note that this is an entry condition! if tail? current-thread [current-thread: head current-thread] ;What do we need to do do-thread: pick current-thread 1 if object? do-thread [ either 'clean-up = do-thread/code-pointer [ error? try [do do-thread/clean-up] remove-thread do-thread ][ if error? try [do get/any in do-thread do-thread/code-pointer][ if found? find first do-thread 'clean-up [ ;do additional cleanup error? try [do do-thread/clean-up] ] remove-thread do-thread ] ] ] if not tail? current-thread [current-thread: next current-thread] ] === Accepting connections Now we need to add some logic to add and remove ports when a connection is accepted. First two functions for add and removing the port 'as-is': port-q-delete: func [ {Removes a port from our port list.} target [port!] ][ remove find port-q target ] port-q-insert: func [ {Inserts a port into our port list.} target [port!] ][ append port-q target ] The function get-handler returns the handler function associated with a given server-port. It is used below. get-handler: func [ {Returns the handler for a given server port} p [port!] ][ return select server-map p ] Remember in the beginning that we decided that we'd associate a bunch of data with a port and put that in an object as well? This is what the following function does, and then it stores the object. As you can see the object gets the port, the handler, a timestamp and an empty field called user-data that can be used for anything you'd like when writing handlers. object-q-insert: func [ {Inserts a port and its corresponding object into the object queue.} serv [port!] {The server port} target [port!] {The connection} /local o my-handler ][ my-handler: get-handler serv append hipe-serv/object-q target o: make object! [port: target handler: :my-handler user-data: none lastaccess: now] append hipe-serv/object-q o ] If we can add, we want to delete it as well! Here it goes: object-q-delete: func [ {Removes a port and its corresponding object from the object queue.} target [port!] ][ remove/part find hipe-serv/object-q target 2 ] On a higher level we want to simply 'start' or 'stop' a client port. This is what start and stop do, by calling the above functions: start: func [ {Initializes everything for a client connection on application level.} serv [port!] {The server port} conn [port!] {The connection port} ][ port-q-insert conn object-q-insert serv conn ] stop: func [ {cleans up after a client connection.} conn [port!] /local conn-object ][ port-q-delete conn error? try [ conn-object: select hipe-serv/object-q conn close conn-object/port object-q-delete conn ] ] So we are ready for our main 'initializing' code for connections. Here is what we do: - Are we in restricted mode? If yes, then check and possibly drop the connection. - If we continue: start the connection. init-conn-port: func [ {Initializes everything on network level.} serv [port!] {The server port} conn [port!] {The connection} ][ either restrict [ either allow? conn/remote-ip [ start serv conn return ][ close conn return ] ][ ; No restrictions start serv conn return ] ] === Monitoring timeouts In the code above you saw that every initialized TCP connection has received a timestamp. We can use this timestamp to detect if a connection has been idle too long. If it has we may choose to drop it. The monitor function starts a thread that does just that. And it makes for a nice threading example as well! monitor: func [ /interval t {the interval time} /timeout t1 {The timeout time} /local int ][ if timeout [hipe-serv/conn-timeout: t1] int: either interval [t][0:0:5] add-thread context [ code-pointer: 'start-monitor interv: int last-run: now clean-up: [] start-monitor: [ set/any 'eee try [ if now > (last-run + interv) [ foreach [ p item] hipe-serv/object-q [ if now > (item/lastaccess + hipe-serv/conn-timeout) [ hipe-serv/stop item/port ] ] self/last-run: now ] ] ] ] ] === The main loop YES! We are ready for our main loop! If you made it to this point you are almost there. Just hold on a little bit longer... Our first of two functions is process-ports. It is given a block of ports and processes the event on them. For every item inthe port block: - If it is a server and itis an UDP port: copy the data and pass it to the handler. - If it is a server and it is a TCP port, initialize it. - If it is is a TCP connection port, get the associated object, update the timestamp and call the handler. We pass the object to the handler, so the handler has access to all the information. process-ports: func [ {Processes all ports that have events.} portz [block!] {The port list} /local temp-obj ][ repeat item portz [ either (is-server? item) [ either item/scheme = 'udp [ ;udp, so call our handler temp-obj: get-handler item temp-obj copy item ][ init-conn-port item first item ] ][ if item/scheme = 'tcp [ temp-obj: select hipe-serv/object-q item temp-obj/lastaccess: now temp-obj/handler temp-obj ] ] ] ] And here is our main loop, called serve. We wait on all currently known ports, both server and client with a timeout of 2 milliseconds. We then look at what we get back. If it is none, there has been no network event if these two milliseconds, so we choose to call process-thread. Otherwise we have a block of ports with events and we call process-ports with that block. If there are mor threads waiting than allowed we start processing them anyway. serve: func [ {Starts serving. Does a blocking wait until there are events. Processes thread in the background as well!} /local portz ][ forever [ portz: wait/all join port-q 0.002 either none? portz [ process-thread ][ process-ports portz ;If there are more than 100 threads, start processing anyway if hipe-serv/max-thread-waiting < length? hipe-serv/threads [ process-thread ] ] ] ] ] === A sample So now we have this great engine. Can we use it I hear you say! Of course! Please take a look at the following handler: test-handler: func [o [object!]][ set-modes o/port [no-wait: false] temp: copy/part o/port 8 insert o/port temp hipe-serv/stop o/port ] It first sets the port to blocking mode, for the sake of simplicity of the sample. If you don't do this, you must keep track of how much data you have read, what do to next and so on. If you want an example of that, just check out Rugby. It is nothing more than a non-blocking handler on top of hipe. Back to our test-handler. It copies the first 8 bytes of whatever it is sent (but you must sent at least 8 bytes) and returns that. Then it stops the port. Let's try: do %hipe.r hipe-server/init-server-port tcp://:9002 :test-handler hipe-server/serve Now fire up a second console to act as a client: p: open tcp://localhost:9002 insert p "1234567891011" print copy p >> 12345678 close p Nice? Now write your own webserver, broker, .... ###