Skip to content

Commit

Permalink
added server-side change chat username
Browse files Browse the repository at this point in the history
  • Loading branch information
oen9 committed May 7, 2020
1 parent ad0ddc5 commit 75523ed
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 70 deletions.
2 changes: 1 addition & 1 deletion jvm/src/main/scala/example/Hello.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object Hello extends App {
val userRepo = doobieTran >>> userRepository.UserRepository.live
val authServ = (userRepo ++ logging ++ cryptoServ) >>> authService.AuthService.live

val chatServ = chatService.ChatService.live
val chatServ = logging >>> chatService.ChatService.live
val chatFlowBuil = (chatServ ++ logging) >>> chatFlowBuilder.ChatFlowBuilder.live

logging ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ class ChatFlowBuilderLive(
out <- Queue.unbounded[Task[*], Dto.ChatDto]

user <- chatService.createUser(out)
_ <- logger.log(LogLevel.Trace)(s"user ${user.id} connected")
_ <- logger.log(LogLevel.Trace)(s"userId ${user.id}: connected")

qinLogic = in.dequeue
.through(handleMsg[R](user))
.through(handleMsg[R](user.id))
.merge(inEndChannel.dequeue)
.unNoneTerminate
_ <- (for {
res <- qinLogic.compile.drain
_ <- logger.log(LogLevel.Trace)(s"${user.id} Fiber ended")
_ <- logger.log(LogLevel.Trace)(s"userId ${user.id}: disconnected")
} yield ()).forkDaemon

outStream = out.dequeue.map(toWsFrame)
Expand All @@ -49,13 +49,13 @@ class ChatFlowBuilderLive(
_ <- chatService.handleServerMsg(Dto.ChatUserLeft(u))
} yield ()

def handleMsg[R](u: Dto.ChatUser): Pipe[RIO[R, *], WebSocketFrame, Option[Unit]] =
def handleMsg[R](userId: Int): Pipe[RIO[R, *], WebSocketFrame, Option[Unit]] =
_.collect {
case WebSocketFrame.Text(msg, _) => fromWsFrame(msg)
}.evalMap(msg =>
for {
_ <- logger.log(LogLevel.Trace)(s"rcv $u : $msg")
_ <- chatService.handleUserMsg(u, msg)
_ <- logger.log(LogLevel.Trace)(s"userId : $userId -> msg : $msg")
_ <- chatService.handleUserMsg(userId, msg)
} yield msg
)
.dropWhile(_ => true)
Expand Down
72 changes: 12 additions & 60 deletions jvm/src/main/scala/example/modules/services/ChatService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,87 +3,39 @@ package example.modules.services
import cats.implicits._
import example.model.ChatData.User
import example.shared.Dto
import example.shared.Dto.ChangeChatName
import example.shared.Dto.ChatMsg
import example.shared.Dto.ChatUserLeft
import example.shared.Dto.NewChatUser
import example.shared.Dto.UnknownData
import fs2.concurrent.Queue
import io.scalaland.chimney.dsl._
import zio._
import zio.logging.Logging

object chatService {
type ChatService = Has[ChatService.Service]

object ChatService {
trait Service {
def createUser(out: Queue[Task[*], Dto.ChatDto]): Task[Dto.ChatUser]
def handleUserMsg(u: Dto.ChatUser, msg: Dto.ClientMsg): Task[Unit]
def handleUserMsg(userId: Int, msg: Dto.ClientMsg): Task[Unit]
def handleServerMsg(msg: Dto.ServerMsg): Task[Unit]
}

val live: ZLayer[Any, Throwable, ChatService] =
ZLayer.fromEffect(for {
users <- Ref.make(Vector[User]())
idCounter <- Ref.make(1)
} yield new Service {

def zioUnit: ZIO[Any, Throwable, Unit] = ZIO.unit // Nothing -> Throwable
def broadcast(msg: Dto.ChatDto): ZIO[Any, Throwable, Unit] =
for {
users <- users.get
_ <- users.foldLeft(zioUnit)((acc, rcvU) => acc *> rcvU.out.enqueue1(msg))
} yield ()

def getUsers(): zio.Task[Dto.ChatUsers] =
for {
users <- users.get
dtoUsers = users.map(_.into[Dto.ChatUser].transform)
} yield Dto.ChatUsers(dtoUsers.toSet)

def createUser(out: Queue[Task[*], Dto.ChatDto]): Task[Dto.ChatUser] =
for {
id <- idCounter.modify(id => (id, id + 1))
u = User(id = id, name = "unknown", out = out)
dtoU = u.into[Dto.ChatUser].transform

_ <- out.enqueue1(dtoU)
_ <- handleServerMsg(Dto.NewChatUser(dtoU))

_ <- users.update(_ :+ u)
dtoUsers <- getUsers()
_ <- out.enqueue1(dtoUsers)
} yield dtoU

def handleUserMsg(u: Dto.ChatUser, msg: Dto.ClientMsg): Task[Unit] = msg match {
case chatMsg: ChatMsg =>
val msgToBroadcast = chatMsg.copy(user = u.some)
broadcast(msgToBroadcast)

case ud: UnknownData =>
for {
users <- users.get
maybeUser = users.find(_.id == u.id)
_ <- maybeUser.fold(zioUnit)(_.out.enqueue1(ud))
} yield ()
}

def handleServerMsg(msg: Dto.ServerMsg): zio.Task[Unit] = msg match {
case nu: NewChatUser =>
broadcast(nu)
case ul @ ChatUserLeft(u) =>
for {
_ <- users.update(_.filter(_.id != u.id))
_ <- broadcast(ul)
} yield ()
}

})
val live: ZLayer[Any with Logging, Throwable, ChatService] =
ZLayer.fromServiceM[Logging.Service, Any, Throwable, ChatService.Service] { logging =>
for {
users <- Ref.make(Vector[User]())
idCounter <- Ref.make(1)
} yield new ChatServiceLive(users, idCounter, logging.logger)
}
}

def createUser(out: Queue[Task[*], Dto.ChatDto]): ZIO[ChatService, Throwable, Dto.ChatUser] =
ZIO.accessM[ChatService](_.get.createUser(out))
def handleUserMsg(u: Dto.ChatUser, msg: Dto.ClientMsg): ZIO[ChatService, Throwable, Unit] =
ZIO.accessM[ChatService](_.get.handleUserMsg(u, msg))
def handleUserMsg(userId: Int, msg: Dto.ClientMsg): ZIO[ChatService, Throwable, Unit] =
ZIO.accessM[ChatService](_.get.handleUserMsg(userId, msg))
def handleServerMsg(msg: Dto.ServerMsg): ZIO[ChatService, Throwable, Unit] =
ZIO.accessM[ChatService](_.get.handleServerMsg(msg))
}
104 changes: 104 additions & 0 deletions jvm/src/main/scala/example/modules/services/ChatServiceLive.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package example.modules.services

import cats.implicits._
import example.model.ChatData.User
import example.modules.services.chatService.ChatService
import example.shared.Dto
import example.shared.Dto.ChangeChatName
import example.shared.Dto.ChatMsg
import example.shared.Dto.ChatUserLeft
import example.shared.Dto.NewChatUser
import example.shared.Dto.UnknownData
import fs2.concurrent.Queue
import io.scalaland.chimney.dsl._
import zio._
import zio.logging.Logger
import zio.logging.LogLevel

class ChatServiceLive(users: Ref[Vector[User]], idCounter: Ref[Int], logger: Logger) extends ChatService.Service {
def getUsers(): zio.Task[Dto.ChatUsers] =
for {
users <- users.get
dtoUsers = users.map(_.into[Dto.ChatUser].transform)
} yield Dto.ChatUsers(dtoUsers.toSet)

def createUser(out: Queue[Task[*], Dto.ChatDto]): Task[Dto.ChatUser] =
for {
id <- idCounter.modify(id => (id, id + 1))
u = User(id = id, name = "unknown", out = out)
dtoU = u.into[Dto.ChatUser].transform

_ <- out.enqueue1(dtoU)
_ <- handleServerMsg(Dto.NewChatUser(dtoU))

_ <- users.update(_ :+ u)
dtoUsers <- getUsers()
_ <- out.enqueue1(dtoUsers)
} yield dtoU

def handleUserMsg(userId: Int, msg: Dto.ClientMsg): Task[Unit] = msg match {
case chatMsg: ChatMsg =>
for {
maybeUser <- users.map(_.find(_.id == userId)).get
_ <- maybeUser
.fold(
logError("ChatMsg", s"userId: $userId not found")
)(
broadcast(_, userDto => chatMsg.copy(user = userDto))
)
} yield ()

case cn: ChangeChatName =>
for {
maybeOldUser <- users.modify(changeUserName(userId, cn.newName, _))
_ <- maybeOldUser
.fold(
logError("ChangeChatName", s"userId: $userId not found")
)(
broadcast(_, userDto => cn.copy(oldUser = userDto))
)
} yield ()

case ud: UnknownData =>
for {
users <- users.get
maybeUser = users.find(_.id == userId)
_ <- maybeUser.fold(zioUnit)(_.out.enqueue1(ud))
} yield ()
}

def handleServerMsg(msg: Dto.ServerMsg): zio.Task[Unit] = msg match {
case nu: NewChatUser =>
broadcast(nu)
case ul @ ChatUserLeft(u) =>
for {
_ <- users.update(_.filter(_.id != u.id))
_ <- broadcast(ul)
} yield ()
}

def zioUnit: ZIO[Any, Throwable, Unit] = ZIO.unit // Nothing -> Throwable
def broadcast(msg: Dto.ChatDto): ZIO[Any, Throwable, Unit] =
for {
users <- users.get
_ <- users.foldLeft(zioUnit)((acc, rcvU) => acc *> rcvU.out.enqueue1(msg))
} yield ()

def broadcast(u: User, createMsg: Option[Dto.ChatUser] => Dto.ChatDto): ZIO[Any, Throwable, Unit] = {
val dtoU = u.into[Dto.ChatUser].transform
val msgToBroadcast = createMsg(dtoU.some)
broadcast(msgToBroadcast)
}

def changeUserName(userId: Int, newName: String, users: Vector[User]) = {
import com.softwaremill.quicklens._
val uIdPred: User => Boolean = _.id == userId

val maybeOldUser = users.find(uIdPred)
val updatedUsers = users.modify(_.eachWhere(uIdPred).name).setTo(newName)

(maybeOldUser, updatedUsers)
}

def logError(caller: String, msg: String): ZIO[Any, Throwable, Unit] = logger.log(LogLevel.Error)(s"$caller -> $msg")
}
7 changes: 4 additions & 3 deletions shared/src/main/scala/example/shared/Dto.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ object Dto {
case class ChatUser(id: Int = 0, name: String = "unknown") extends ChatDto
case class ChatUsers(value: Set[ChatUser] = Set()) extends ChatDto

sealed trait ClientMsg extends ChatDto
case class ChatMsg(user: Option[ChatUser] = None, msg: String) extends ClientMsg
case class UnknownData(data: String) extends ClientMsg
sealed trait ClientMsg extends ChatDto
case class ChatMsg(user: Option[ChatUser] = None, msg: String) extends ClientMsg
case class ChangeChatName(oldUser: Option[ChatUser] = None, newName: String) extends ClientMsg
case class UnknownData(data: String) extends ClientMsg

sealed trait ServerMsg extends ChatDto
case class NewChatUser(u: ChatUser) extends ServerMsg
Expand Down

0 comments on commit 75523ed

Please sign in to comment.