当前位置:   article > 正文

Rust : windows下protobuf和压缩传输方案_protobuf rpc rust

protobuf rpc rust

此前dbpystream库是用python开发 web api。今天在rust中试用一下protobuf。

本文关键词:编译器、protobuf、proto文件、序列化、zstd压缩,build。

一、 protobuf编译器下载

具体见相关文章。没有编译器,protobuf无法运行。
windows参见:

https://blog.csdn.net/wowotuo/article/details/139458846?spm=1001.2014.3001.5502。
  • 1

二、proto文件的准备

proto文件中主要模拟了一个dbpystream中一个get_price函数的输入和输出的格式,输入HistoryBarRequest ,输出HistoryBarResponse。HistoryBarResponse中,有代码名称,日期,开盘价,最高价等。
在格式中,包括了string,TimeStamp,double; 其中repeated就是vec格式。

syntax = "proto3";
package dbdata;
import public "google/protobuf/timestamp.proto";
service DataService {
  rpc query (HistoryBarRequest) returns (HistoryBarRequest) {}
}
service Login{
  rpc auth (Auth) returns (Response) {}
}
message Auth{
   string id =1; 
   string password=2; 
}
message HistoryBarRequest {
  string security  = 1;
  string frequency = 2;
  FieldParam fields     = 3;
  google.protobuf.Timestamp start_date = 4;//收集时间
  google.protobuf.Timestamp end_date = 5;//收集时间
  bool is_fq  =6 ; 
}
message HistoryBarResponse{
  repeated string securitycode = 1;
  repeated google.protobuf.Timestamp  datetime =2;
  repeated double  open = 3;
  repeated double  high = 4;
  repeated double close = 5;
  repeated double low =6;
  repeated double volume=7;
  repeated double amount=8;
  repeated sint64 is_fq = 9;
}

message FieldParam{
  bool is_all = 1;
}

message Response {
  bool status = 1;
  bytes msg   = 2;
  string error = 3;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

三、toml文件、文件目录结构、build.rs
1、toml文件有

[package]
name = "clap-2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = "0.7.5" # web 服务器
anyhow = "1" # 错误处理
reqwest = { version = "0.12.4", features = ["json"] } # HTTP 客户端
tokio = { version = "1", features = ["full"] } # 异步处理库
prost = "0.12.6"
# Only necessary if using Protobuf well-known types:
prost-types = "0.12.6"
serde = { version = "1", features = ["derive"] } # 序列化/反序列化数据
polars = { version = "0.39.0", features = ["json"]}
chrono = { version = "0.4", features = ["unstable-locales"] }
zstd = "0.13" # 压缩库
[build-dependencies]
prost-build = "0.12.6" # 编译 protobuf
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

上面polars,chrono,prost-types,prost-build,prost,zstd是关键库,其它暂时可以不看。

2、目录结构
具体如下:

PS D:\my_program\clap-2> tree /F
卷 新加卷 的文件夹 PATH 列表
卷序列号为 D855-8BFE
D:.
│  .gitignore
│  build.rs
│  Cargo.lock
│  Cargo.toml
│  dbdata.proto
│
└─src
    │  main.rs
    │
    └─pb
            dbdata.rs
            mod.rs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

可见,在src/目录下,手动创建了一个pb文件夹,存放未来生成的dbdata.proto文件。

3、build.rs
在src同级目录上(如上),创建build.rs,具体如下:

fn main() {
    prost_build::Config::new()
        .out_dir("src/pb")//设置proto输出目录
        .compile_protos(&["dbdata.proto"], &["."])//我们要处理的proto文件
        .unwrap();
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

运行cargo build,即生成了dbdata.rs,具体内容如下:

// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Auth {
    #[prost(string, tag = "1")]
    pub id: ::prost::alloc::string::String,
    #[prost(string, tag = "2")]
    pub password: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HistoryBarRequest {
    #[prost(string, tag = "1")]
    pub security: ::prost::alloc::string::String,
    #[prost(string, tag = "2")]
    pub frequency: ::prost::alloc::string::String,
    #[prost(message, optional, tag = "3")]
    pub fields: ::core::option::Option<FieldParam>,
    /// 收集时间
    #[prost(message, optional, tag = "4")]
    pub start_date: ::core::option::Option<::prost_types::Timestamp>,
    /// 收集时间
    #[prost(message, optional, tag = "5")]
    pub end_date: ::core::option::Option<::prost_types::Timestamp>,
    #[prost(bool, tag = "6")]
    pub is_fq: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HistoryBarResponse {
    #[prost(string, repeated, tag = "1")]
    pub securitycode: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
    #[prost(message, repeated, tag = "2")]
    pub datetime: ::prost::alloc::vec::Vec<::prost_types::Timestamp>,
    #[prost(double, repeated, tag = "3")]
    pub open: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "4")]
    pub high: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "5")]
    pub close: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "6")]
    pub low: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "7")]
    pub volume: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "8")]
    pub amount: ::prost::alloc::vec::Vec<f64>,
    #[prost(sint64, repeated, tag = "9")]
    pub is_fq: ::prost::alloc::vec::Vec<i64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FieldParam {
    #[prost(bool, tag = "1")]
    pub is_all: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Response {
    #[prost(bool, tag = "1")]
    pub status: bool,
    #[prost(bytes = "vec", tag = "2")]
    pub msg: ::prost::alloc::vec::Vec<u8>,
    #[prost(string, tag = "3")]
    pub error: ::prost::alloc::string::String,
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

4、mod.rs
在pb目录下,创建mod.rs:

pub mod dbdata;
  • 1

四、原始数据、main.rs

1、原始数据

这个原始数据csv的格式,一共是5499行,9列。
在这里插入图片描述
即收到request后,将发送这个数据内容出去。
文件名称是"C:\Users\Desktop\test.csv"。
这里采用了polars来读取csv文件。模拟的是服务端读相关数据库(如clickhouse)。

2、main.rs

下面的main.rs模拟了收到resquest,发送response的过程。这个过程可以用web框架,如axum,也可以用grpc框架。这部分不是今天的重点。

需要说明的是:在序列化HistoryBarResponse的基础上,并用zstd库进行了压缩打包,进一步减少了二进制对象的大小,有利于网络传输。

use pb::dbdata::{self, HistoryBarResponse,Response};
mod pb;
use prost_types::Timestamp;
use std::time::{Duration, SystemTime,Instant};
use polars::prelude::*;
use chrono::{NaiveDate, NaiveDateTime,NaiveTime};
use zstd;
fn main() ->Result<(),PolarsError>{
    let request = dbdata::HistoryBarRequest {
        security: String::from("600036.XSHG"),
        frequency: String::from("1minute"),
        fields: Some(dbdata::FieldParam {is_all:true}),
        start_date: Some(prost_types::Timestamp::from(SystemTime::now()-Duration::from_secs(3600*12*250))),
        end_date:Some(prost_types::Timestamp::from(SystemTime::now())),
        is_fq:true,
    };
    println!("模拟收到request:{:?}",request);
    println!("模拟开始进行相应的数据处理.....");
    let now_0 = Instant::now();

    
    let file = r"C:\Users\hongsl\Desktop\test.csv";
    let df: DataFrame = CsvReader::from_path(file)?
            .has_header(true)
            .finish().unwrap();
    println!("starting...");
    println!("csv =>df 文件的行列信息 : {:?}",df.shape());
    println!("读csv花时: {:?} 秒!", now_0.elapsed().as_secs_f32());
    let now_1 = Instant::now();
    let res_raw = HistoryBarResponse{
        securitycode : df.column("securitycode")?.str()?.into_no_null_iter().map(|s|String::from(s)).collect(),
        datetime:df.column("date")?.str()?.into_no_null_iter()
        .map(|t| convert(t)).collect(),
        open:df.column("open")?.f64()?.into_no_null_iter().collect(),
        high:df.column("high")?.f64()?.into_no_null_iter().collect(),
        close:df.column("close")?.f64()?.into_no_null_iter().collect(),
        low:df.column("low")?.f64()?.into_no_null_iter().collect(),
        volume:df.column("volume")?.i64()?.into_no_null_iter().map(|v|v as f64).collect(),
        amount:df.column("amount")?.f64()?.into_no_null_iter().collect(),
        is_fq:df.column("is_fq")?.i64()?.into_no_null_iter().collect(),
    };
    println!("准备historybarresponse花时: {:?} 秒!", now_1.elapsed().as_secs_f32());
    //println!("{:?}", res);
    let now_2 = Instant::now();
    let encoded_raw = prost::Message::encode_to_vec(&res_raw);
    println!("historybarresponse => encoded 花时: {:?} 秒!", now_2.elapsed().as_secs_f32());
    let compression_level = 3;
    // 服务端对序列化对象进行压缩,
    let now_3 = Instant::now();
    let compressed = zstd::encode_all(&*encoded_raw, compression_level).unwrap();
    println!("historybarresponse encoded =>compressed 花时: {:?} 秒!", now_2.elapsed().as_secs_f32());
    // 服务端模拟通过web或grpc发送
    let res = Response{
        status:true,
        msg: compressed,
        error:String::from(""),
    };
    let encoded = prost::Message::encode_to_vec(&res);
    println!("服务端从csv =>compressed 后发出:{:?}",now_0.elapsed().as_secs_f32());
    let now_4 = Instant::now();
    // 模拟客户端接收到web或grpc相应的数据对象
    let decoded_raw =  < pb::dbdata::Response as prost::Message>::decode(&encoded[..]).unwrap();
    
    // 并进行解压,得到Hist
    let decoded_raw: Vec<u8> = zstd::decode_all(decoded_raw.msg.as_slice()).unwrap();
    let decoded  = < pb::dbdata::HistoryBarResponse as prost::Message>::decode(&decoded_raw[..]).unwrap();
    
    println!("模拟发送相应的数据: {:?}", &decoded.securitycode[0]);
    println!("客户端解压数据花时:{:?}",now_4.elapsed().as_secs_f32());
    Ok(())
}

fn convert(dt_str:&str) ->Timestamp {
    let naive_date = NaiveDate::parse_from_str(dt_str, "%Y/%m/%d").unwrap();
    let nano_second = NaiveTime::from_hms_milli_opt(0, 0, 0, 0).unwrap();
    let dt: NaiveDateTime = naive_date.and_time(nano_second );
    Timestamp{
        seconds:dt.and_utc().timestamp(),
        nanos:0,
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

运行如下:

模拟收到request:HistoryBarRequest { security: "600036.XSHG", frequency: "1minute", fields: Some(FieldParam { 
is_all: true }), start_date: Some(Timestamp { seconds: 1707372680, nanos: 689289100 }), end_date: Some(Timestamp { seconds: 1718172680, nanos: 689290500 }), is_fq: true }
模拟开始进行相应的数据处理.....
starting...
csv =>df 文件的行列信息 : (5499, 9)
读csv花时: 0.0031793 秒!
准备historybarresponse花时: 0.0007998 秒!
historybarresponse => encoded 花时: 0.0002774 秒!
historybarresponse encoded =>compressed 花时: 0.0017601 秒!
服务端从csv =>compressed 后发出:0.0061509
模拟发送相应的数据: "600036.XSHG"
客户端解压数据花时:0.0015236
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/751774
推荐阅读
相关标签
  

闽ICP备14008679号