ScalarFunctionExpr protobuf 序列化/反序列化最小例子

ScalarFunctionExpr protobuf 序列化/反序列化最小例子 什么是 ScalarFunctionExpr ScalarFunctionExpr 是 Datafusion 物理执行层对一次标量函数调用的完整描述。 它描述的是 函数名称是什么 函数参数有哪些 函数返回,包括返回类型是什么,是否可空等等 执行过程中的配置信息 比如 sqrt(a),平方根 它的物理表达式大概长 ScalarFunctionExpr function: sqrt 的实现 name: "sqrt" args: - Column("a", index=0) return_type: Float64 nullable: false/true 标量函数和聚合函数 标量函数 参数和值一一对应,比如 sqrt、abs、lower、upper、trim、length、concat 等等 聚合函数 多个输入对一一个输出,比如 sum、avg、count、max、min 等等 什么是 protobuf protobuf是一种格式 是一种面向高效传输和跨语言通信的序列化格式。 JSON: 文本格式 人能直接看懂 调试方便 体积相对大 解析通常慢一点 不强制 schema protobuf: 二进制格式 人不能直接看懂 体积小 解析快 强依赖 .proto schema 更适合高性能/跨语言 RPC 如何写最小例子 定义表达式 编码 解码 验证 完整例子代码 // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! See `main.rs` for how to run it. //! //! This example demonstrates the smallest useful round trip for a physical //! [`ScalarFunctionExpr`]: //! //! 1. Build a physical expression for `sqrt(a)`. //! 2. Serialize it to a protobuf `PhysicalExprNode`. //! 3. Deserialize it back to a physical expression. //! 4. Evaluate both expressions against the same batch. use std::sync::Arc; use arrow::array::Float64Array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion::common::config::ConfigOptions; use datafusion::common::{DataFusionError, Result}; use datafusion::physical_expr::ScalarFunctionExpr; use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::expressions::Column; use datafusion::prelude::SessionContext; use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use datafusion_proto::physical_plan::from_proto::parse_physical_expr; use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; use datafusion_proto::protobuf::physical_expr_node::ExprType; pub async fn scalar_function_expr() -> Result<()> { println!("=== ScalarFunctionExpr Proto Round Trip Example ===\n"); // 定义输入数据长什么样 // 我们有一张输入表/输入 batch,它只有一列: // 列名: a // 类型: Float64 // 是否允许 NULL: false // 也就是类似 SQL 里的: // CREATE TABLE t ( // a DOUBLE NOT NULL // ); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); // 对输入 batch 里的第 0 列 a 求平方根。 let expr = Arc::new(ScalarFunctionExpr::try_new( datafusion::functions::math::sqrt(), vec![Arc::new(Column::new("a", 0))], schema.as_ref(), Arc::new(ConfigOptions::new()), )?) as Arc<dyn PhysicalExpr>; println!("Step 1: Built physical expression: {expr}"); // 这段是在把刚刚造好的物理表达式 sqrt(a@0) 转成 proto 结构,序列化 let codec = DefaultPhysicalExtensionCodec {}; let proto = serialize_physical_expr(&expr, &codec)?; let Some(ExprType::ScalarUdf(scalar_udf)) = proto.expr_type.as_ref() else { return Err(DataFusionError::Execution( "Expected ScalarUdf proto node".to_string(), )); }; println!( "Step 2: Serialized to proto: name={}, args={}, has_fun_definition={}", scalar_udf.name, scalar_udf.args.len(), scalar_udf.fun_definition.is_some() ); // 反序列化 let ctx = SessionContext::new(); let decoded_expr = parse_physical_expr(&proto, &ctx.task_ctx(), &schema, &codec)?; println!("Step 3: Deserialized expression: {decoded_expr}"); // 验证反序列化的表达式、执行结果是不是和原来一样 let batch = RecordBatch::try_new( Arc::clone(&schema), vec![Arc::new(Float64Array::from(vec![4.0, 9.0, 16.0]))], )?; let original = expr.evaluate(&batch)?.into_array(batch.num_rows())?; let decoded = decoded_expr .evaluate(&batch)? .into_array(batch.num_rows())?; let original = original .as_any() .downcast_ref::<Float64Array>() .ok_or_else(|| { DataFusionError::Execution("Expected Float64 result array".to_string()) })?; let decoded = decoded .as_any() .downcast_ref::<Float64Array>() .ok_or_else(|| { DataFusionError::Execution("Expected Float64 result array".to_string()) })?; assert_eq!(original, decoded); println!("Step 4: Evaluated both expressions successfully"); println!(" input: [4.0, 9.0, 16.0]"); println!(" output: {decoded:?}"); Ok(()) } 执行流程 ScalarFunctionExpr(sqrt(a)) ↓ serialize_physical_expr PhysicalExprNode::ScalarUdf ↓ parse_physical_expr ScalarFunctionExpr(sqrt(a)) ↓ evaluate Float64Array [2.0, 3.0, 4.0] 运行命令和输出 /Users/zhengpeng/.cargo/bin/cargo run --color=always --example proto --profile dev --manifest-path /Users/zhengpeng/Source/Code/Rust-Code/Github/datafusion/datafusion-examples/Cargo.toml -- scalar_function_expr Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.19s Running `target/debug/examples/proto scalar_function_expr` Usage: cargo run --example proto -- [all|composed_extension_codec|expression_deduplication|scalar_function_expr] === ScalarFunctionExpr Proto Round Trip Example === Step 1: Built physical expression: sqrt(a@0) Step 2: Serialized to proto: name=sqrt, args=1, has_fun_definition=false Step 3: Deserialized expression: sqrt(a@0) Step 4: Evaluated both expressions successfully input: [4.0, 9.0, 16.0] output: PrimitiveArray<Float64> [ 2.0, 3.0, 4.0, ] Process finished with exit code 0 说明 例子里的 serialize_physical_expr 是把 Rust 里的 PhysicalExpr 转成 protobuf 的 Rust struct,还没有进一步 encode 成二进制 bytes; 如果要网络传输,还需要 prost::Message::encode。

Java 写出Rust 的match 代码

Java 写出Rust 的match 代码 在Java中,我们可以使用switch语句来实现类似于Rust中match的功能。下面是一个示例,展示了如何在Java中使用switch来模拟Rust的match语法。 public enum ServiceType { RESUME_COMMUNICATION(1, "复课沟通"), CLASS_PLANNING(2, "班级规划"), STUDENT_PLANNING(3, "学员规划"), STUDENT_INVENTORY(4, "学员盘点"), RENEWAL_INVENTORY_LOCK(5, "续报盘点锁定"), SATISFACTION_SURVEY(6, "满意度问卷"); private final Integer code; private final String description; ServiceType(Integer code, String description) { this.code = code; this.description = description; } public Integer getCode() { return code; } public String getDescription() { return description; } public static ServiceType fromCode(Integer code) { for (ServiceType type : values()) { if (type.code.equals(code)) { return type; } } throw new IllegalArgumentException("Unknown service type code: " + code); } // 模拟 Rust 的 match 表达式 public <T> T match( java.util.function.Supplier<T> resumeCommunication, java.util.function.Supplier<T> classPlanning, java.util.function.Supplier<T> studentPlanning, java.util.function.Supplier<T> studentInventory, java.util.function.Supplier<T> renewalInventoryLock, java.util.function.Supplier<T> satisfactionSurvey ) { switch (this) { case RESUME_COMMUNICATION: return resumeCommunication.get(); case CLASS_PLANNING: return classPlanning.get(); case STUDENT_PLANNING: return studentPlanning.get(); case STUDENT_INVENTORY: return studentInventory.get(); case RENEWAL_INVENTORY_LOCK: return renewalInventoryLock.get(); case SATISFACTION_SURVEY: return satisfactionSurvey.get(); default: throw new IllegalStateException("Unexpected value: " + this); } } } 使用示例:

Rust GUI

Rust GUI 使用 gpui Render 和 Render Once 的区别 Render: 持续渲染模式 组件会在每一帧都重新渲染 适用于需要频繁更新的动态内容 消耗更多资源,但能保持实时更新 Render Once: 一次性渲染模式 组件只在初始化或状态变化时渲染 适用于静态内容或不频繁更新的组件 性能更好,资源消耗更少 在 GPUI 中选择合适的渲染模式可以优化应用性能和用户体验。 Entity 是什么 Entity 是 GPUI 框架中的一个智能指针类型,类似于 React 中的引用机制。它用于管理 UI 组件的生命周期和状态。 Entity 的主要用法 1. 创建 Entity Entity 通过 cx.new() 方法创建,接受一个闭包来初始化组件: pub fn view(window: &mut Window, cx: &mut App) -> Entity<Self> { cx.new(|cx| Self::new(window, cx)) } 2. 在结构体中存储 Entity Entity 常被用作结构体字段,用于持有子组件的引用: pub struct Example { root: Entity<ButtonStory>, } 3. 在构造函数中创建子组件 impl Example { pub fn new(window: &mut Window, cx: &mut Context<Self>) -> Self { let root = ButtonStory::view(window, cx); Self { root } } 4. 在 render 方法中使用 Entity Entity 可以直接作为子元素使用,因为它实现了 IntoElement trait:

锈迹片段

Rust Snippet 锈迹片段 如何调试Rust过程属性宏的代码 通过println!(),可以打印出宏展开后的代码。 fn generate_handler(internal: bool, input: TokenStream) -> Result<TokenStream> { let crate_name = utils::get_crate_name(internal); println!("crate_name: {:?}", crate_name); let item_fn = syn::parse::<ItemFn>(input)?; let (impl_generics, type_generics, where_clause) = item_fn.sig.generics.split_for_impl(); let vis = &item_fn.vis; let docs = item_fn .attrs .iter() .filter(|attr| attr.path().is_ident("doc")) .cloned() .collect::<Vec<_>>(); let ident = &item_fn.sig.ident; let call_await = if item_fn.sig.asyncness.is_some() { Some(quote::quote!(.await)) } else { None }; let def_struct = if !item_fn.sig.generics.params.is_empty() { let iter = item_fn .sig .generics .params .iter() .filter_map(|param| match param { GenericParam::Type(ty) => Some(ty), _ => None, }) .enumerate() .map(|(idx, ty)| { let ident = format_ident!("_mark{}", idx); let ty_ident = &ty.ident; (ident, ty_ident) }); let struct_members = iter.clone().map(|(ident, ty_ident)| { quote! { #ident: ::std::marker::PhantomData<#ty_ident> } }); let default_members = iter.clone().map(|(ident, _ty_ident)| { quote! { #ident: ::std::marker::PhantomData } }); quote! { #vis struct #ident #type_generics { #(#struct_members),*} impl #type_generics ::std::default::Default for #ident #type_generics { fn default() -> Self { Self { #(#default_members),* } } } } } else { quote! { #vis struct #ident; } }; let mut extractors = Vec::new(); let mut args = Vec::new(); for (idx, input) in item_fn.sig.inputs.clone().into_iter().enumerate() { if let FnArg::Typed(pat) = input { let ty = &pat.ty; let id = quote::format_ident!("p{}", idx); args.push(id.clone()); extractors.push(quote! { let #id = <#ty as #crate_name::FromRequest>::from_request(&req, &mut body).await?; }); } } let expanded = quote! { #(#docs)* #[allow(non_camel_case_types)] #def_struct impl #impl_generics #crate_name::Endpoint for #ident #type_generics #where_clause { type Output = #crate_name::Response; //println!(output: #crate_name::Response); //println!(Output); #[allow(unused_mut)] async fn call(&self, mut req: #crate_name::Request) -> #crate_name::Result<Self::Output> { let (req, mut body) = req.split(); #(#extractors)* #item_fn let res = #ident(#(#args),*)#call_await; let res = #crate_name::error::IntoResult::into_result(res); std::result::Result::map(res, #crate_name::IntoResponse::into_response) } } }; println!("Expanded code: {}", expanded); Ok(expanded.into()) } 清除本地的cargo 缓存 Option 1: Use cargo-cache tool (recommended) First install the tool if you don’t have it:

Rust实现Http Web Server

用Rust实现WebSerer的第一篇 最近业余时间一直在学习Rust,也在尝试用Rust去造轮子。第一个轮子就是用Rust去实现Web服务器。Web服务器的核心流程就是Request 和 Response。 简单的总结就是解析请求,然后匹配到Server初始化的路由处理器,然后路由处理器处理完返回。 同步版本的程序 启动TCP服务 解析处理请求 匹配路由 返回响应 示范代码如下 use sync_core::server::Server; use sync_core::service::Service; fn main() { //trace log tracing_subscriber::fmt::init(); let mut server = Server::new(Service::new()); let route = sync_core::route::Route::new("GET".to_string(), "/hello".to_string(), || { "Hello World".to_string() }); // hello world 2 return int value let route2 = sync_core::route::Route::new("GET".to_string(), "/hello2".to_string(), || { //i32 value return 42.to_string() }); // push route to server server.service.routes.push(route); server.service.routes.push(route2); server.start(); } use crate::service::Service; use log::info; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; pub struct Server { pub service: Service, } impl Server { pub fn new(service: Service) -> Self { Self { service } } pub fn start(&self) { let addr = "127.0.0.1:8080"; let listener = TcpListener::bind(addr).unwrap(); info!("Listening on http://{}", addr); for stream in listener.incoming() { match stream { Ok(stream) => { self.handle_connection(stream); } Err(e) => { eprintln!("failed: {}", e); } } } } fn handle_connection(&self, mut tcp_stream: TcpStream) { let mut buffer = [0; 1024]; tcp_stream.read(&mut buffer).unwrap(); info!("Request: {}", String::from_utf8_lossy(&buffer)); // parse request let request = String::from_utf8_lossy(&buffer); let request = request.split_whitespace().collect::<Vec<&str>>(); let method = request[0]; let path = request[1]; let _version = request[2]; info!("Method: {}", method); info!("Path: {}", path); info!("Version: {}", _version); // find route let route = self .service .routes .iter() .find(|r| r.method == method && r.path == path); match route { Some(route) => { let response_body = (route.handler)(); let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", response_body); tcp_stream.write(response.as_bytes()).unwrap(); } None => { let response_body = "Not Found Route"; let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", response_body); tcp_stream.write(response.as_bytes()).unwrap(); } } } } use crate::route::Route; pub struct Service { pub routes: Vec<Route>, } impl Service { pub fn new() -> Self { Self { routes: Vec::new() } } } pub struct Route { pub method: String, pub path: String, pub handler: fn() -> String, } impl Route { pub fn new(method: String, path: String, handler: fn() -> String) -> Self { Self { method, path, handler, } } }

WEB网关系列02-基于http协议实现流量的代理

基于http协议实现流量的代理 模式之间差异 直连模式 代理模式 直连模式 直连模式,也就是服务与服务之间直接请求调用,比如说我们正常在浏览器上面输入baidu,然后浏览器把百度相应的响应反馈回来的过程。 这个过程种,用户充当了service-A的角色,百度的服务器充当了service-B的角色。 代码还原以上过程,利用java的Java.net.HttpURLConnection类实现网络访问 除了HttpURLConnection还有很多java封装的包也能实现网络访问 通过common封装好HttpClient; 通过 Apache 封装好CloseableHttpClient; 通过SpringBoot-RestTemplate; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; public class DirectMode { public static void main(String[] args) { String api = "https://www.baidu.com"; HttpURLConnection connection = null; InputStream in = null; BufferedReader reader = null; try { //构造一个URL对象 URL url = new URL(api); //获取URLConnection对象 connection = (HttpURLConnection) url.openConnection(); //getOutputStream会隐含的进行connect(即:如同调用上面的connect()方法,所以在开发中不调用connect()也可以) in = connection.getInputStream(); //通过InputStreamReader将字节流转换成字符串,在通过BufferedReader将字符流转换成自带缓冲流 reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); StringBuilder sb = new StringBuilder(); String line = null; //按行读取 while ((line = reader.readLine()) != null) { sb.append(line); } String response = sb.toString(); System.out.println(response); } catch (Exception exception) { exception.printStackTrace(); } finally { if (connection != null) { connection.disconnect(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } } 上面的代码会输出百度的响应信息,跟我们正常在浏览器上面访问百度是同样的效果,只是,浏览器上面有更美观的样式展现出来了。