Пишем простой чат с консольным интерфейсом используя трубно-ориентированное программирование с котами
Если в процессе изучения gRPC хотите попрактиковаться с Bidirectional Streaming (или так называемый двунаправленная потоковая передача данных), c запросами в рамках одного соединения, инициированием событий со стороны сервера, то создание простенького чата может быть отличным способом.
Проект будем писать на языке Scala с использованием библиотеки fs2-grpc. Будем использовать клиент-серверную архитектуру, где клиенты могут отправлять сообщения на сервер, который будет ретранслировать их всем подключенным клиентам.
gRPC
Но прежде чем начать, давайте вспомним, что такое gRPC и как он связан с HTTP/2 не углубляясь в подробности (на эту тему и так достаточно статей).
gRPC - это RPC-фреймворк (Remote Procedure Call), который позволяет создавать клиент-серверные приложения для обмена данными. gRPC использует под капотом протокол HTTP/2, который позволяет ускорить передачу данных, уменьшить объем передаваемых данных и снизить задержку. Важно упомянуть о том, что gRPC использует Protobuf чтобы определить методы и структуру сообщений с помощью специального языка описания интерфейсов, а затем сгенерировать код для работы с этими сообщениями на различных языках программирования. Protobuf обеспечивает эффективную сериализацию/десериализацию данных в компактный бинарный формат.
Механизм работы чата
Ограничения
Во первых, чтобы не усложнять проект, я решил делать чат консольным и не хранить сообщения на стороне сервера.
Bidirectional Streaming
Поговорим про механизм работы чата с Bidirectional Streaming в gRPC. Процесс обмена сообщениями будет работать таким образом, что сервер и клиент обмениваются потоками сообщений в рамках одного соединения. Клиент отправляет событие, сервер его получает, обрабатывает, а затем отправляет ответное событие. Клиент и сервер обмениваются сообщениями асинхронно. Таким образом, данные передаются между клиентом и сервером в реальном времени и в обе стороны.
Мультикастинг событий внутри сервера
Когда клиенты подключаются к серверу, каждый из них может отправлять события на сервер. Однако, возникает проблема - как переслать сообщения от одного клиента остальным подключенным клиентам.
Для решения этой проблемы можно использовать механизм мультикастинга с помощью топика. Топик - это объект, который позволяет отправлять сообщения одновременно нескольким подписчикам. То есть, если один клиент отправляет событие, то полученное событие на стороне сервера будет направлено на этот топик, а оттуда автоматически пересылается всем клиентам, подписанным на этот топик.
Для реализации мультикастинга я использовал Topic из библиотеки fs2 (Functional Streams for Scala).
Таким образом, визуально механизм взаимодействия клиентов в сервером выглядит примерно так.
Клиент генерирует событие и отправляет на сервер, который, в свою очередь, раскидывает это событие по клиентам
Реализация
Для языка Scala есть несколько библиотек для работы с gRPC. Я использую fs2-grpc, который является оберткой над ScalaPB и сделана на основе функциональной библиотеки для работы со стримами - fs2.
fs2-grpc поддерживает все типы RPC-вызовов - Unary, Server Streaming, Client Streaming и Bidirectional Streaming. Она также предоставляет механизмы обработки ошибок и управления ресурсами, такие как Resource и Bracket. fs2-grpc интегрируется со стеком функциональных библиотек для работы с эффектами (cats-effect, zio, monix). В моем примере используется Cats Effect 3.
Proto
И так, приступим. В первую очередь нужно накидать прото-файл, в котором опишем контракт взаимодействия клиента и сервера.
Создадим некоторый ChatService
с методом eventsStream
, у которого на входе и на выходе потоковые данные с типом Events
(то есть будем события через стримы туда-сюда делать).
1
2
3
4
service ChatService {
rpc eventsStream(stream Events) returns (stream Events) { }
}
Events
содержит данные обернутые в тип события, которые могут быть инициированы как на стороне клиента, так и сервера (в нашем случае только на стороне клиентов).
1
2
3
4
5
6
7
8
message Events {
oneof event {
Login client_login = 1;
Logout client_logout = 2;
Message client_message = 3;
Shutdown server_shutdown = 4;
}
Реализация сервера
Ранее мы говорили, что сервер должен получать события от клиентов и транслировать их остальным клиентам.
После компиляции прото-файла будет сгенерирован базовый код для работы с gRPC, среди которого будет интерфейс ChatServiceFs2Grpc
. Он должен быть имплементирован на стороне сервера. Моя реализация имеет следующий вид.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object ChatService {
def apply[F[_]: Concurrent: Console](
eventsTopic: Topic[F, Events]
): ChatServiceFs2Grpc[F, Metadata] = new ChatServiceFs2Grpc[F, Metadata] {
val eventsToClients: Stream[F, Events] =
eventsTopic
.subscribeUnbounded
.evalTap(event => Console[F].println(s"From topic: $event"))
override def eventsStream(
eventsFromClient: fs2.Stream[F, Events],
ctx: Metadata
): fs2.Stream[F, Events] = {
eventsToClients.concurrently(
eventsFromClient
.evalTap(event => Console[F].println(s"Event from client: $event"))
.evalMap(eventsTopic.publish1)
)
}
}
}
Мы видим метод eventsStream
, который описывали в proto-файле. Из потока eventsFromClient
получаем события от клиентов. На выходе отдаем некоторый поток событий eventsToClients
. Если посмотреть выше, то видим, что eventsToClients
это подписка на топик eventsTopic: Topic[F, Events]
, в который публикуются события от клиента для отправки остальным клиентам.
Сборка и запуск сервера
Собираем все компоненты, которые представляют собой основу серверного приложения.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object ChatServerApp extends IOApp {
private def runServer(service: ServerServiceDefinition): IO[Nothing] = {
NettyServerBuilder
.forPort(50053)
.keepAliveTime(5, TimeUnit.SECONDS)
.addService(service)
.resource[IO]
.evalMap(server => IO(server.start()))
.useForever
}
override def run(args: List[String]): IO[ExitCode] = for {
topic <- Topic[IO, Events]
serviceResource: Resource[IO, ServerServiceDefinition] =
ChatServiceFs2Grpc.bindServiceResource[IO](ChatService(topic))
_ <- serviceResource.use(runServer)
} yield ExitCode.Success
}
В функции runServer
создается и запускается новый сервер с помощью NettyServerBuilder
, который прослушивает порт 50053. NettyServerBuilder
предоставляется библиотекой gRPC для создания серверов, использующих Netty в качестве транспорта и позволяет настроить параметры сервера (порт, keepAliveTime и т.д.)
В методе run
создается топик, который будет использоваться для мультикастинга событий по клиентам. Создаем инстанс сервиса ChatService
и биндим его к серверу. Затем запускаем наш сервер.
1
$ sbt "runMain org.github.ainr.chat.server.ChatServerApp"
В итоге, когда сервер запущен, клиенты смогут подключаться к нему, отправлять сообщения и получать их в режиме реального времени.
Реализация клиента
Что должен делать клиент? Клиент может показаться чутка сложнее, но на самом деле тут тоже все просто. Клиент делает несколько простых вещей:
- Читает ввод в консоль
- Отправляет события серверу
- Получает события от сервера и обрабатывает их
- Печатает полученные сообщения в консоль
Со стороны клиента тоже все сделано на стримах (Stream).
Чтение ввода из консоли
Для чтения ввода из консоли снова прибегаем к помощи стримов. Создаем класс InputStream
с методом read
, который возвращает поток сообщений напечатанных клиентом - Stream[F, String]
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object InputStream {
def apply[F[_]: Async: Console](bufSize: Int): InputStream[F] = {
new InputStream[F] {
override def read: Stream[F, String] = {
fs2.io
.stdinUtf8(bufSize)
.through(fs2.text.lines)
.evalTap(erase) // удалить из консоли ввод
.filter(_.nonEmpty) // фильтруем пустые строки
}
private def erase: PartialFunction[String, F[Unit]] = {
_ => Console[F].print("\u001b[1A\u001b[0K") // удаляет то, что мы напечатали в консоль путем ввода спец-символов
}
}
}
}
По коду видно, что он берет поток символов, преобразует их в строки и фильтрует пустые. Магическим может показаться только лишь метод erase
, который печатает что-то непонятное в консоль.
На самом деле никакой магии нет. Все, что он делает - это удаляет то, что мы напечатали в консоль путем ввода спец-символов ANSI чтобы сообщения не дублировались.
Логика клиента
Далее введенный пользователем в консоль текст нужно преобразовать в тип события Event
и отправить серверу.
В целом, логика клиента довольно простая и описана путем композиции стримов в методе start
. Здесь снова фигурирует chatService: ChatServiceFs2Grpc[F, Metadata]
с методом eventsStream
сгенерированный библиотекой fs2-grpc
на вход которого отправляем события из консоли (InputStream
), генерируемые пользователем.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object ChatClient {
def apply[F[_]: Concurrent: Console](
clientName: String,
inputStream: InputStream[F],
chatService: ChatServiceFs2Grpc[F, Metadata]
): ChatClient[F] = new ChatClient[F] {
private val grpcMetaData = new Metadata() // empty
override def start: F[Unit] = {
chatService
.eventsStream(
login(clientName) ++ inputStream.read.through(handleInput),
grpcMetaData
)
.through(processEvent) // обрабатываем полученные события от сервера
.through(writeToConsole) // пишем в консоль
.compile
.drain
}
private def login(clientName: String): fs2.Stream[F, Events] =
fs2.Stream(Events(ClientLogin(Login(clientName))))
// ...
Metadata в gRPC - это способ передачи дополнительных метаданных между клиентом и сервером, которые представляет собой пары ключ-значение и могут быть добавлены к любому запросу.
На выходе eventsStream
ловим события с сервера, сгенерированные другими клиентами, обрабатываем их методом processEvent
, который преобразовывает события в строки.
1
2
3
4
5
6
7
8
private def processEvent: Pipe[F, Events, String] =
_.map { data =>
data.event match {
case event: ClientLogin => s"${Color.Green(event.value.name).overlay(Bold.On)} entered the chat." case event: ClientLogout => s"${Color.Blue(event.value.name).overlay(Bold.On)} left the chat." case event: ClientMessage => s"${Color.LightGray(s"${event.value.name}:").overlay(Bold.On)} ${event.value.message}"
case _: ServerShutdown => s"${Color.LightRed("Server shutdown")}"
case unknown => s"${Color.Red("Unknown event:")} $unknown"
}
}
Для форматированного вывода текста в консоли используется библиотека fansi от lihaoyi, предназначенная для работы с цветами и стилями текста в консольном приложении. Она позволяет добавлять цветовые и стилевые эффекты к тексту, что делает консольный вывод более информативным и привлекательным. Далее сообщения будут напечатаны в консоль методом writeToConsole
.
Сборка и запуск клиента
Собираем все компоненты, которые представляет собой основу клиентского приложения.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object ChatClientApp extends IOApp {
private def buildChatService(channel: Channel): Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =
ChatServiceFs2Grpc.stubResource[IO](channel)
private def resources: Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =
NettyChannelBuilder
.forAddress("127.0.0.1", 50053)
.usePlaintext()
.resource[IO]
.flatMap(buildChatService)
override def run(args: List[String]): IO[ExitCode] =
resources.use { chatServiceFs2Grpc =>
ChatClient(
args.headOption.getOrElse("Anonymous"),
InputStream[IO](bufSize = 1024),
chatServiceFs2Grpc
).start
}.as(ExitCode.Success)
}
NettyChannelBuilder
- это класс, предоставляемый библиотекой gRPC для создания клиентов, использующих Netty
в качестве транспорта. Он позволяет настроить параметры клиента.
В функции buildChatService
создается ресурс, который представляет собой клиент для обращения к серверу чата. Для его создания используется метод stubResource
из ChatServiceFs2Grpc
.
Запускаем клиент через sbt, передав в аргументы имя клиента.
1
$ sbt "runMain org.github.ainr.chat.client.ChatClientApp Username"
И можем общаться :)
Вместо заключения
Создание небольших, простых проектов - это отличный способ попрактиковаться и углубить свои знания в технологиях. Это может быть что-то, что вы можете написать быстро и без особых усилий, но в то же время дает возможность изучить какой-то новый аспект технологии или языка программирования.
Простые проекты могут быть очень разнообразными. Например, вы можете написать небольшой веб-сервер, создать небольшую игру, написать скрипт для автоматического сбора данных, или же написать чат на базе gRPC, как мы обсуждали ранее.
Преимущество создания небольших проектов заключается в том, что вы можете более глубоко изучить технологию и применить знания на практике. Вы также можете быстро увидеть результат своей работы и получить удовлетворение от завершения проекта.
Не бойтесь начинать с чего-то простого и постепенно увеличивать сложность - это поможет вам стать более опытным и уверенным программистом.
Исходники
Код проекта можно посмотреть на гитхабе - https://github.com/a-khakimov/simple-fs2-grpc-chat.