Разработка асинхронного сервиса на Rust с использованием mio

Введение

Как то ранее я писал пробный сервис на Rust, но время идет язык изменяется и поэтому я решил повторить свой опыт, но в этот раз решил написать серверное приложение с уклоном в телеметрию.

Для экспериментального проекта я выбрал реализацию протокола wialon ips 1.1, а чтобы более детально углубиться в асинхронность выбрал библиотеку mio.

Работа с библиотекой mio

Mio это низкоуровенная библиотека для работы с сетевыми соединениями в неблокирующем режиме и основанная на событиях. Кроме того эта библиотека имеет минимальный оверхед над абстракциями ОС.

Кроме того данная библиотека используется под капотом в популярной библиотеке tokio.

Основным понятием в библиотеке является пул событий. Как ясно из названия в нем регистрируются входящие события, с которыми в дальнейшим и будет работать приложение, и сохраняет их в хранилище. У каждого события есть свой токен, и сетевое соединение.

Пул событий объявляется следующим образом:

use mio::{Events, Poll};

let mut poll = Poll::new()?;

Чтобы событие появилось в пуле его надо зарегистрировать с помощью функции register, которая принимает в параметрах сетевое соединение, токен и тип события:

const SERVER: Token = Token(0);

let addr = "127.0.0.1:13265".parse()?;
let mut server = TcpListener::bind(addr)?;

poll.registry().register(&mut server, SERVER, Interest::READABLE)?;

После того как событие добавили в пул, необходимо этот пул соединить с хранилищем событий и ждать когда произойдет первое событие:

let mut events = Events::with_capacity(128);

loop {
    poll.poll(&mut events, None)?;
}

Данная операция выполняется в цикле так как, если событий не происходит мы ждем их появления.

Для обработки события необходимо запустить цикл, с обработчиками для каждого события:

const CLIENT: Token = Token(1);

for event in events.iter() {
    match event.token() {
        SERVER => loop {  
            let (mut connection, address) = match server.accept() {
                Ok((connection, address)) => (connection, address),
                Err(e) => match e.kind() {
                    io::ErrorKind::WouldBlock => break,
                    _ => return Err(e),
                }
            };

            poll.registry().register(&mut connection, CLIENT, Interest::READABLE | Interest::WRITABLE)?;
        },
        CLIENT => {
            // событие для чтения
            if event.is_readable() {
                // обработка полученного пакета
            }


            // событие для для записи
            if event.is_writable() {
                // обработка полученного пакета
            }
        }
    }
}

Приложение ждет когда появится входящее соединение и если оно успешно установлено создает для него новый токен и добавляет его в пул для ожидания от него пакетов, и как только наступает событие (пришел сетевой пакет) приложение начинает его обработку.

Модернизация приложения

В целом всего выше описанного хватит чтобы сделать просто приложение на mio. Но как только мы делаем что-то реальное возникают следующие вопросы:

  • как обработать пул соединений с клиентским сессиями
  • как сделать неблокирующую операцию сохранения или вывода на экран принятых данных

Обработка пула соединений с клиентским сессиями

Для того чтобы в приложении обработать пул соединений один из вариантов завести хэш-таблицу в которой ключом будет токен, а значение открытое соединение, и в момент открытия входящего соединения добавлять его в эту хэш таблицу:

let connections: HashMap<Token, TcpStream> = HashMap::new();
let token: Token = Token(SERVER.0 + 1);

...

for event in events.iter() {
    match event.token() {
        SERVER => loop {  
            let (mut connection, address) = match server.accept() {
                Ok((connection, address)) => (connection, address),
                Err(e) => match e.kind() {
                    io::ErrorKind::WouldBlock => break,
                    _ => return Err(e),
                }
            };
            token += 1
            connections.insert(token, connection)
            poll.registry().register(&mut connection, token, Interest::READABLE)?;
        },
        token => {
            // клиент с token отправил данные, чем инициировал событие с типом чтения
            let connection = connections.get_mut(&token).unwrap();
            if event.is_readable() {
                // чтение данных из connection для клиента инициирующего событие  
                // их обработка
            }
        }
    }
}

После такой модификации кода когда на сервер приходит входящее соединение оно регистрируется в пуле событий и добавляется в пул соединений. Далее когда от клиента с token приходит сообщение пул событий сообщает о наступлении данного события и после этого можно прочитать сообщение отправленное клиентом и обработать его.

Таким образом получился простой механизм сессий.

Неблокирующий вывод данных

Следующая проблема это неблокирующее сохранение данных или вывод их на экран. Так как если это делать прямо в месте обработки соединений, то необходимо будет либо делать syscall для сохранения на диск, либо ждать ответа по сети и тд. Все это может повлиять на производительность и чтобы такого не было лучше эту обработку проводить в отдельном потоке.

Для взаимодействия между потоками можно использовать стандарные механизм каналов.

Модификация кода будет следующая:

use std::sync::mpsc::{sync_channel};

struct Packet{} 

let (sender, receiver) = sync_channel::<Packet>(1024);
thread::spawn(move || {
    loop {
        let p = receiver.recv().unwrap();
        println!("{}", p); // или сохранение на диск или в бд
    }
});

...

for event in events.iter() {
    match event.token() {
        ...

        token => {        
            let connection = connections.get_mut(&token).unwrap();
            if event.is_readable() {
                let p = Packet{}
                
                // получаем данные из connection и преобразуем их в Packet
                
                sender.send(p)
            }            
        }
    }
}

В данном коде вы запустили отдельный поток, который отвечает только за обработку принятых данных, а объявленный ранее цикл событий обеспечивает ему их поставку.

Мои впечатления от Rust

Из плюсов языка для себя строгость языка в целом, а также интересные концепции языка типа borrow checker, который проверяет что нет ссылок на несуществующие объекты. Также понравилась идея что у данных может быть только одна мутабельная ссылка (если вдруг надо использовать несколько мутабельных ссылок есть модуль std:cell но надо быть аккуратным так как это не потоко-безопасно).

Из минусов для себя я ответил что качественный многопоточный код писать труднее чем на Go, хотя он и получается безопасней, но для этого надо понимать основные концепции языка такие как владение и время жизни.

Про generics и gc уже много писали в интернетах, поэтому описывать это не вижу смысла.

Заключение

В данной статья я описал в целом концепцию работы приложения, исходники которого можно найти на github, так как описывать весь код целиком не имеет большого смысла. Также в статье не приведено структурирование кода и объединения его в структуры, так как оформление тоже можно посмотреть на github.

Полезные ссылки

 
comments powered by Disqus