Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide runProcess' that returns allows returning a result? #306

Closed
teh opened this issue Feb 6, 2017 · 10 comments
Closed

Provide runProcess' that returns allows returning a result? #306

teh opened this issue Feb 6, 2017 · 10 comments

Comments

@teh
Copy link
Contributor

teh commented Feb 6, 2017

I'm new to h-d so I'm rising an issue for discussion before starting to submit PRs. Maybe I missed something obvious.

I propose to add a new function runProcess' :: LocalNode -> Process a -> IO a.

My main use case is interacting with the h-d ecosystem from other places, e.g. I have wai Application that then runs call to call some stuff. In this specific case I use an MVar to get the result but, but I have this pattern repeating in a few other places as well.

result' <- newEmptyMVar @HttpResponse
runProcess node $ do
  ...
-- must never fail, we prefer exception over blocking handler that holds on to memory
Just (HttpResponse status body) <- tryTakeMVar result'
respond $ responseLBS status [] (BSL.fromStrict body)
@hyperthunk
Copy link
Member

hyperthunk commented Feb 6, 2017

If each handler in the wai app is single threaded (which it must be I'm guessing), then a more idiomatic approach would be to spawn a sibling process that is linked (as in thread linked, not process linked) to the handler thread. Spawn it with a TChan (or stm equivalent) or TQueue if you want to handle a backlog of requests, and use this to communicate with the Cloud Haskell process. If you need replies then pass two queues/channels. The sibling process would just sit around going forever $ readChan inChan >>= fwdReq target >> receive >>= writeChan outChan

You can tag messages if you need to sort the replies out.

Cloud Haskell is fundamentally a messaging passing paradigm, so try not to fight that and make it synchronised on calls.

@hyperthunk
Copy link
Member

So here is an example of a similar pattern, from the distributed-process-async package. This code is the heart of the implementation, and simply spawns a process that it happens to share a TMVar with.

A similar example from ManagedProcess.

We start with an enclosing type that abstracts the underlying communication pattern:

-- | Provides a means for servers to listen on a separate, typed /control/
-- channel, thereby segregating the channel from their regular
-- (and potentially busy) mailbox.
newtype ControlChannel m =
  ControlChannel {
      unControl :: (SendPort (Message m ()), ReceivePort (Message m ()))
    }

-- | Creates a new 'ControlChannel'.
newControlChan :: (Serializable m) => Process (ControlChannel m)
newControlChan = newChan >>= return . ControlChannel

-- | The writable end of a 'ControlChannel'.
--
newtype ControlPort m =
  ControlPort {
      unPort :: SendPort (Message m ())
    } deriving (Show)
deriving instance (Serializable m) => Binary (ControlPort m)
instance Eq (ControlPort m) where
  a == b = unPort a == unPort b

You can of course embed a non-cloud-haskell thing in there, such as TChan or TQueue and use liftIO in the cloud haskell code to read/write on it. Here we're using a cloud haskell typed channel, which works similarly and get be read inside a process' receive loop, and thereby acts like an additional control plan for messaging.

We need to pass this to our server process, but we also want the call site to have access to the write end of the channel (but not the read end, obviously!), so we take an expression from the control channel to a process definition.

chanServe :: (Serializable b)
          => a
          -> InitHandler a s
          -> (ControlChannel b -> Process (ProcessDefinition s))
          -> Process ()
chanServe argv init mkDef = do
  pDef <- mkDef . ControlChannel =<< newChan
  runProcess (recvLoop pDef) argv init

The canonical pattern I've come across involves using vanilla concurrency constructs like MVar, TMVar, TChan, etc, to coordinate between code that runs in the Process monad and the parts of the application/s that don't.

Another simplifying factor can sometimes be to spawn your non-cloud-haskell threads/work/etc from cloud haskell, instead of the other way around. There's no rule that says a cloud haskell process can't lift to IO and stay there, servicing web requests or database queries. The huge advantage of doing things that way, as well, is that your IO code can then be supervised. Supervising complex child specs is quite similar to supervising managed processes (from distributed-process-client-server) that require clients to access their APIs via a handle, or that require MVar/STM based sharing with their clients or parent processes in a supervision tree.

Here are some examples from various interesting places (which you might find instructive in the various cloud haskell code bases):

There are many many more in the various test suites. Also, you can always write a custom back end for managed process (or perhaps more simply, a pool backend in the vein of this sort of thing).

Point I am making is that there will be ways to cohesively integrate your CH and non-CH code, without coupling them in the wrong ways, and without always needing to spin up a process to make synchronous calls to a single server.

I would work through the various code bases and crib them for ideas, then experiment and benchmark your approaches. If you come up with ways to integrate and supervise processes from IO (and other monads) or vice versa, we should build up a common interface and publish it.

This has been raised before too - I'll look up the original thread.

@teh
Copy link
Contributor Author

teh commented Feb 6, 2017

What an amazing response, thank you so much!

I feel that this could be made into a tutorial "interacting with non-CH code" almost verbatim. I certainly would have found a section on that very useful. I can start a PR for https://github.com/haskell-distributed/haskell-distributed.github.com if you want?

@teh
Copy link
Contributor Author

teh commented Feb 6, 2017

I did end up with a single TMVar into which I place a response-tvar as well as the request itself. I'm not super happy because technically I now need timeouts in two places.

In practice I will just pretend that rpcProcess never dies which is a good enough approximation for my current playing around.

type ProcessChannel = TMVar (TMVar HttpResponse, HttpRequest)

initialProcessChannel :: IO ProcessChannel
initialProcessChannel = newEmptyTMVarIO

rpcProcess :: ProcessChannel -> Process ()
rpcProcess processChannel = forever $ do
  (replyVar, httpRequest) <- liftIO $ atomically $ takeTMVar processChannel

  let nid = NodeId (EndPointAddress "127.0.0.1:8081:0")
  response <- DClient.call (nid, "http" :: String) httpRequest
  liftIO $ atomically $ putTMVar replyVar response

@hyperthunk
Copy link
Member

It's tough to know how to improve on that without better understanding how your code works.

The way Web servers typically work in erlang is that each request is mapped to a spawned process, and of course you can pool them to minimise on allocations etc. Obviously you don't want to be fighting against whatever threading/muxing model wai uses, so that might not be practical.

@hyperthunk
Copy link
Member

Obviously you don't want to be fighting against whatever threading/muxing model wai uses, so that might not be practical.

Having read up on the architecture of WAI, I don't think there's a neat way to make the handler a process, nor do I think there's a point in doing that either. The user-thread-per-request model clearly works well (with the parallel I/O manager in play) and I don't see the point in fighting what's clearly good.

I'm really unconvinced by that code snippet though - it just looks wrong. Why are you making a blocking remote call... I mean, sure, lots of code goes off and talks to a database or whatever mid-request, so in principle it's fine, but I suspect there are neater ways of doing this. Also, is that call a Managed Process invocation or Distributed.Process.Primitives.call btw, because they have quite different semantics.

What an amazing response, thank you so much!
I feel that this could be made into a tutorial "interacting with non-CH code" almost verbatim. I certainly would have found a section on that very useful.

That would be great, thank you! We will probably need to add quite a lot to it.

I did start looking at the snap http library, since it uses streams, and I wondered if it would be easier to fork a process and incrementally give back the results. The way Erlang's cowboy web server works is that it uses an underlying TCP socket acceptor pool, ranch, which has supervised acceptor processes handle incoming connections, then spawning a new process to handle the protocol layers for each one.

I guess the thing is that these kinds of design/architectural decision tend to vary a lot depending on lots of factors. There is, for example, a network-transport-websockets implementation, which means it's quite possible to front a web server, use websockets to communicate with cloud haskell, and back again - since the n-t layer does that it should work fine for CH code.

Also, introducing brokers and routing processes is often very useful intra-node. When we get into distributed stuff then it gets much more complex. Fun, but complex. :)

@teh
Copy link
Contributor Author

teh commented Feb 7, 2017

(call is from d-p-client-server)

OK yea my code snippet isn't good. The main problem (other than it being synchronous) though is that error handling in CH is quite different. E.g. DiedDisconnect is not caught by a safeCall or tryCall. I haven't read all the library code yet but it looks like I always need a monitor + ProcessMonitorNotification handling to handle errors.

After going around in circles a few times I once again think some form of synchronous communication API that lives in IO and lets me e.g. callTimeout (a send, reply pair) to a localNode would be useful. I'd rather have just one set of concurrency primitives to deal with (CH) than two (by adding in STM).

@hyperthunk
Copy link
Member

hyperthunk commented Feb 7, 2017

That's absolutely wrong @teh, DiedDisconnect is absolutely handled by safeCall and co. It will arrive as a monitor signal if the remote node goes down...?

@hyperthunk
Copy link
Member

hyperthunk commented Feb 7, 2017

When you monitor a process, we absolutely fire that monitor if/when we detect the death of a peer node. The only issue you might have is that if no data is written between the two nodes, that disconnect might not be seen for quite some time. One solution to that (as per Erlang) is to have a heartbeat, which is very easy to implement in theory and I started adding an implementation in distributed-process-extras - however there are some complexities....

The way Erlang's net_kernel works, you have a tick sent to all connected nodes on the normal communication backbone every interval N. If a tick isn't seen from a peer within N*4 then the node is considered down.

One problem with this approach is that the tick messages can get stuck behind other, arbitrarily large payloads. That is not good. One approach to solving that problem could potentially be to open up a second connection between the nodes and use that in isolation for the tick messages, but now we're just papering over a design issue - whether you consider it to be a network-transport issue or a cloud haskell one. At the end of the day if it's the interface (rather than just the socket between two node controllers) that gets saturated, then you're potentially in for a bad day.

I don't think the node controller should do any ticking at all, and ideally you'd want the network-transport layer to handle this kind of thing (and smartly, separating its control plane from data transfer). There is currently no sane way for a process to tell its own node controller that it thinks a node is down either.

Ultimately, the simple approach right now is to heartbeat process that simply keeps track of connected nodes and regularly calls whereisRemoteAsync nodeId for each. I will put a simple module into -extras HEAD, but it's important to understand that this is a sticking plaster to keep traffic between nodes flowing. There is a deeper issue of interconnectivity between nodes that is impossible to solve for all cases (i.e., not all distributed systems want the same discovery/membership policy), and which we definitely don't want to be solving here.

Please let me know if you run into issues with safeCall at all - it will respond to a DiedDisconnect as I mentioned. I'll push the heartbeat implementation tomorrow once I've written some tests for it, and will add a test case in distributed-process-client-server, to ensure it works as advertised. Once that's done I'd like to close this issue, or at least move it over to -client-server, since it's not a core distributed-process thing we're trying to solve here. We can also discuss it further over in network-transport-{tcp, etc} if you like - it is relevant there, though IIRC this issue has been raised before.

@hyperthunk
Copy link
Member

@teh - see haskell-distributed/distributed-process-extras@48984ef, which should ensure that when resolving a remote name fails, it doesn't break tryCall, safeCall, etc.

I will probably pull the exception handling up, since any Resolvable implementation that throws can do this to us otherwise, but I'm working my way up through the library layers in terms of getting things done, so I'll come to -client-server in due course.

Can I suggest we close this ticket and open a new one to discuss making interactions between non-CH and CH code easier? Also please see haskell-distributed/distributed-process-client-server#9.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants