diff --git a/Cargo.lock b/Cargo.lock index 73644261c..e8a6c7d08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2475,49 +2475,6 @@ dependencies = [ "util", ] -[[package]] -name = "clickhouse" -version = "0.11.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0875e527e299fc5f4faba42870bf199a39ab0bb2dbba1b8aef0a2151451130f" -dependencies = [ - "bstr", - "bytes 1.8.0", - "clickhouse-derive", - "clickhouse-rs-cityhash-sys", - "futures 0.3.31", - "hyper 0.14.31", - "hyper-tls", - "lz4", - "sealed", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "url", -] - -[[package]] -name = "clickhouse-derive" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18af5425854858c507eec70f7deb4d5d8cec4216fcb086283a78872387281ea5" -dependencies = [ - "proc-macro2", - "quote", - "serde_derive_internals 0.26.0", - "syn 1.0.109", -] - -[[package]] -name = "clickhouse-rs-cityhash-sys" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4baf9d4700a28d6cb600e17ed6ae2b43298a5245f1f76b4eab63027ebfd592b9" -dependencies = [ - "cc", -] - [[package]] name = "client" version = "0.1.0" @@ -2668,7 +2625,6 @@ dependencies = [ "call", "channel", "chrono", - "clickhouse", "client", "clock", "collab_ui", @@ -7336,25 +7292,6 @@ dependencies = [ "url", ] -[[package]] -name = "lz4" -version = "1.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" -dependencies = [ - "lz4-sys", -] - -[[package]] -name = "lz4-sys" -version = "1.11.1+lz4-1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "mac" version = "0.1.1" @@ -11034,7 +10971,7 @@ checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e" dependencies = [ "proc-macro2", "quote", - "serde_derive_internals 0.29.1", + "serde_derive_internals", "syn 2.0.87", ] @@ -11171,18 +11108,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "sealed" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b5e421024b5e5edfbaa8e60ecf90bda9dbffc602dbb230e6028763f85f0c68c" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "search" version = "0.1.0" @@ -11330,17 +11255,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "serde_derive_internals" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "serde_derive_internals" version = "0.29.1" diff --git a/Cargo.toml b/Cargo.toml index 776425580..faade8c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -360,7 +360,6 @@ cargo_metadata = "0.19" cargo_toml = "0.20" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.4", features = ["derive"] } -clickhouse = "0.11.6" cocoa = "0.26" cocoa-foundation = "0.2.0" convert_case = "0.6.0" diff --git a/crates/collab/.env.toml b/crates/collab/.env.toml index 5d292387c..e775179f6 100644 --- a/crates/collab/.env.toml +++ b/crates/collab/.env.toml @@ -19,11 +19,6 @@ LLM_DATABASE_URL = "postgres://postgres@localhost/zed_llm" LLM_DATABASE_MAX_CONNECTIONS = 5 LLM_API_SECRET = "llm-secret" -# CLICKHOUSE_URL = "" -# CLICKHOUSE_USER = "default" -# CLICKHOUSE_PASSWORD = "" -# CLICKHOUSE_DATABASE = "default" - # SLACK_PANICS_WEBHOOK = "" # RUST_LOG=info diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index d08bcfa18..996ee3984 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -29,7 +29,6 @@ axum = { version = "0.6", features = ["json", "headers", "ws"] } axum-extra = { version = "0.4", features = ["erased-json"] } base64.workspace = true chrono.workspace = true -clickhouse.workspace = true clock.workspace = true collections.workspace = true dashmap.workspace = true diff --git a/crates/collab/k8s/collab.template.yml b/crates/collab/k8s/collab.template.yml index 89921f242..588e4ac54 100644 --- a/crates/collab/k8s/collab.template.yml +++ b/crates/collab/k8s/collab.template.yml @@ -214,26 +214,6 @@ spec: secretKeyRef: name: blob-store key: bucket - - name: CLICKHOUSE_URL - valueFrom: - secretKeyRef: - name: clickhouse - key: url - - name: CLICKHOUSE_USER - valueFrom: - secretKeyRef: - name: clickhouse - key: user - - name: CLICKHOUSE_PASSWORD - valueFrom: - secretKeyRef: - name: clickhouse - key: password - - name: CLICKHOUSE_DATABASE - valueFrom: - secretKeyRef: - name: clickhouse - key: database - name: SLACK_PANICS_WEBHOOK valueFrom: secretKeyRef: diff --git a/crates/collab/src/api/events.rs b/crates/collab/src/api/events.rs index 15085db2c..ca2c91de2 100644 --- a/crates/collab/src/api/events.rs +++ b/crates/collab/src/api/events.rs @@ -1,8 +1,7 @@ use super::ips_file::IpsFile; use crate::api::CloudflareIpCountryHeader; -use crate::clickhouse::write_to_table; use crate::{api::slack, AppState, Error, Result}; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use aws_sdk_s3::primitives::ByteStream; use axum::{ body::Bytes, @@ -12,17 +11,12 @@ use axum::{ Extension, Router, TypedHeader, }; use chrono::Duration; -use rpc::ExtensionMetadata; use semantic_version::SemanticVersion; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use serde_json::json; use sha2::{Digest, Sha256}; use std::sync::{Arc, OnceLock}; -use telemetry_events::{ - ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event, - EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic, - ReplEvent, SettingEvent, -}; +use telemetry_events::{Event, EventRequestBody, Panic}; use util::ResultExt; use uuid::Uuid; @@ -406,7 +400,6 @@ pub async fn post_events( Error::Internal(anyhow!(err)) })?; - let mut to_upload = ToUpload::default(); let Some(last_event) = request_body.events.last() else { return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?; }; @@ -418,7 +411,12 @@ pub async fn post_events( if let Some(kinesis_client) = app.kinesis_client.clone() { if let Some(stream) = app.config.kinesis_stream.clone() { let mut request = kinesis_client.put_records().stream_name(stream); - for row in for_snowflake(request_body.clone(), first_event_at, country_code.clone()) { + for row in for_snowflake( + request_body.clone(), + first_event_at, + country_code.clone(), + checksum_matched, + ) { if let Some(data) = serde_json::to_vec(&row).log_err() { request = request.records( aws_sdk_kinesis::types::PutRecordsRequestEntry::builder() @@ -433,941 +431,9 @@ pub async fn post_events( } }; - let Some(clickhouse_client) = app.clickhouse_client.clone() else { - Err(Error::http( - StatusCode::NOT_IMPLEMENTED, - "not supported".into(), - ))? - }; - - let first_event_at = chrono::Utc::now() - - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event); - - for wrapper in &request_body.events { - match &wrapper.event { - Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - country_code.clone(), - checksum_matched, - )), - Event::InlineCompletion(event) => { - to_upload - .inline_completion_events - .push(InlineCompletionEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - country_code.clone(), - checksum_matched, - )) - } - Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )), - Event::Assistant(event) => { - to_upload - .assistant_events - .push(AssistantEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )) - } - Event::Cpu(_) | Event::Memory(_) | Event::InlineCompletionRating(_) => continue, - Event::App(event) => to_upload.app_events.push(AppEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )), - Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )), - Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )), - Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )), - Event::Extension(event) => { - let metadata = app - .db - .get_extension_version(&event.extension_id, &event.version) - .await?; - to_upload - .extension_events - .push(ExtensionEventRow::from_event( - event.clone(), - wrapper, - &request_body, - metadata, - first_event_at, - checksum_matched, - )) - } - Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event( - event.clone(), - wrapper, - &request_body, - first_event_at, - checksum_matched, - )), - } - } - - to_upload - .upload(&clickhouse_client) - .await - .map_err(|err| Error::Internal(anyhow!(err)))?; - Ok(()) } -#[derive(Default)] -struct ToUpload { - editor_events: Vec, - inline_completion_events: Vec, - assistant_events: Vec, - call_events: Vec, - cpu_events: Vec, - memory_events: Vec, - app_events: Vec, - setting_events: Vec, - extension_events: Vec, - edit_events: Vec, - action_events: Vec, - repl_events: Vec, -} - -impl ToUpload { - pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> { - const EDITOR_EVENTS_TABLE: &str = "editor_events"; - write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?; - - const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events"; - write_to_table( - INLINE_COMPLETION_EVENTS_TABLE, - &self.inline_completion_events, - clickhouse_client, - ) - .await - .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?; - - const ASSISTANT_EVENTS_TABLE: &str = "assistant_events"; - write_to_table( - ASSISTANT_EVENTS_TABLE, - &self.assistant_events, - clickhouse_client, - ) - .await - .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?; - - const CALL_EVENTS_TABLE: &str = "call_events"; - write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?; - - const CPU_EVENTS_TABLE: &str = "cpu_events"; - write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?; - - const MEMORY_EVENTS_TABLE: &str = "memory_events"; - write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?; - - const APP_EVENTS_TABLE: &str = "app_events"; - write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?; - - const SETTING_EVENTS_TABLE: &str = "setting_events"; - write_to_table( - SETTING_EVENTS_TABLE, - &self.setting_events, - clickhouse_client, - ) - .await - .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?; - - const EXTENSION_EVENTS_TABLE: &str = "extension_events"; - write_to_table( - EXTENSION_EVENTS_TABLE, - &self.extension_events, - clickhouse_client, - ) - .await - .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?; - - const EDIT_EVENTS_TABLE: &str = "edit_events"; - write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?; - - const ACTION_EVENTS_TABLE: &str = "action_events"; - write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?; - - const REPL_EVENTS_TABLE: &str = "repl_events"; - write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client) - .await - .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?; - - Ok(()) - } -} - -pub fn serialize_country_code(country_code: &str, serializer: S) -> Result -where - S: Serializer, -{ - if country_code.len() != 2 { - use serde::ser::Error; - return Err(S::Error::custom( - "country_code must be exactly 2 characters", - )); - } - - let country_code = country_code.as_bytes(); - - serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16) -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct EditorEventRow { - system_id: String, - installation_id: String, - session_id: Option, - metrics_id: String, - operation: String, - app_version: String, - file_extension: String, - os_name: String, - os_version: String, - release_channel: String, - signed_in: bool, - vim_mode: bool, - #[serde(serialize_with = "serialize_country_code")] - country_code: String, - region_code: String, - city: String, - time: i64, - copilot_enabled: bool, - copilot_enabled_for_language: bool, - architecture: String, - is_staff: Option, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - is_via_ssh: bool, -} - -impl EditorEventRow { - fn from_event( - event: EditorEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - country_code: Option, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - architecture: body.architecture.clone(), - system_id: body.system_id.clone().unwrap_or_default(), - installation_id: body.installation_id.clone().unwrap_or_default(), - session_id: body.session_id.clone(), - metrics_id: body.metrics_id.clone().unwrap_or_default(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - operation: event.operation, - file_extension: event.file_extension.unwrap_or_default(), - signed_in: wrapper.signed_in, - vim_mode: event.vim_mode, - copilot_enabled: event.copilot_enabled, - copilot_enabled_for_language: event.copilot_enabled_for_language, - country_code: country_code.unwrap_or("XX".to_string()), - region_code: "".to_string(), - city: "".to_string(), - is_via_ssh: event.is_via_ssh, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct InlineCompletionEventRow { - installation_id: String, - session_id: Option, - provider: String, - suggestion_accepted: bool, - app_version: String, - file_extension: String, - os_name: String, - os_version: String, - release_channel: String, - signed_in: bool, - #[serde(serialize_with = "serialize_country_code")] - country_code: String, - region_code: String, - city: String, - time: i64, - is_staff: Option, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, -} - -impl InlineCompletionEventRow { - fn from_event( - event: InlineCompletionEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - country_code: Option, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone().unwrap_or_default(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - file_extension: event.file_extension.unwrap_or_default(), - signed_in: wrapper.signed_in, - country_code: country_code.unwrap_or("XX".to_string()), - region_code: "".to_string(), - city: "".to_string(), - provider: event.provider, - suggestion_accepted: event.suggestion_accepted, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct CallEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - release_channel: String, - os_name: String, - os_version: String, - checksum_matched: bool, - - // ClientEventBase - installation_id: String, - session_id: Option, - is_staff: Option, - time: i64, - - // CallEventRow - operation: String, - room_id: Option, - channel_id: Option, -} - -impl CallEventRow { - fn from_event( - event: CallEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone().unwrap_or_default(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - operation: event.operation, - room_id: event.room_id, - channel_id: event.channel_id, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct AssistantEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - session_id: Option, - is_staff: Option, - time: i64, - - // AssistantEventRow - conversation_id: String, - kind: String, - phase: String, - model: String, - response_latency_in_ms: Option, - error_message: Option, -} - -impl AssistantEventRow { - fn from_event( - event: AssistantEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - conversation_id: event.conversation_id.unwrap_or_default(), - kind: event.kind.to_string(), - phase: event.phase.to_string(), - model: event.model, - response_latency_in_ms: event - .response_latency - .map(|latency| latency.as_millis() as i64), - error_message: event.error_message, - } - } -} - -#[derive(Debug, clickhouse::Row, Serialize)] -pub struct CpuEventRow { - installation_id: Option, - session_id: Option, - is_staff: Option, - usage_as_percentage: f32, - core_count: u32, - app_version: String, - release_channel: String, - os_name: String, - os_version: String, - time: i64, - // pub normalized_cpu_usage: f64, MATERIALIZED - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, -} - -impl CpuEventRow { - #[allow(unused)] - fn from_event( - event: CpuEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - usage_as_percentage: event.usage_as_percentage, - core_count: event.core_count, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct MemoryEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - session_id: Option, - is_staff: Option, - time: i64, - - // MemoryEventRow - memory_in_bytes: u64, - virtual_memory_in_bytes: u64, -} - -impl MemoryEventRow { - #[allow(unused)] - fn from_event( - event: MemoryEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - memory_in_bytes: event.memory_in_bytes, - virtual_memory_in_bytes: event.virtual_memory_in_bytes, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct AppEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - session_id: Option, - is_staff: Option, - time: i64, - - // AppEventRow - operation: String, -} - -impl AppEventRow { - fn from_event( - event: AppEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - operation: event.operation, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct SettingEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - session_id: Option, - is_staff: Option, - time: i64, - // SettingEventRow - setting: String, - value: String, -} - -impl SettingEventRow { - fn from_event( - event: SettingEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - checksum_matched, - patch: semver.map(|v| v.patch() as i32), - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - setting: event.setting, - value: event.value, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct ExtensionEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - session_id: Option, - is_staff: Option, - time: i64, - - // ExtensionEventRow - extension_id: Arc, - extension_version: Arc, - dev: bool, - schema_version: Option, - wasm_api_version: Option, -} - -impl ExtensionEventRow { - fn from_event( - event: ExtensionEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - extension_metadata: Option, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - extension_id: event.extension_id, - extension_version: event.version, - dev: extension_metadata.is_none(), - schema_version: extension_metadata - .as_ref() - .and_then(|metadata| metadata.manifest.schema_version), - wasm_api_version: extension_metadata.as_ref().and_then(|metadata| { - metadata - .manifest - .wasm_api_version - .as_ref() - .map(|version| version.to_string()) - }), - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct ReplEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - session_id: Option, - is_staff: Option, - time: i64, - - // ReplEventRow - kernel_language: String, - kernel_status: String, - repl_session_id: String, -} - -impl ReplEventRow { - fn from_event( - event: ReplEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - kernel_language: event.kernel_language, - kernel_status: event.kernel_status, - repl_session_id: event.repl_session_id, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct EditEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - // Note: This column name has a typo in the ClickHouse table. - #[serde(rename = "sesssion_id")] - session_id: Option, - is_staff: Option, - time: i64, - - // EditEventRow - period_start: i64, - period_end: i64, - environment: String, - is_via_ssh: bool, -} - -impl EditEventRow { - fn from_event( - event: EditEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - let period_start = time - chrono::Duration::milliseconds(event.duration); - let period_end = time; - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - period_start: period_start.timestamp_millis(), - period_end: period_end.timestamp_millis(), - environment: event.environment, - is_via_ssh: event.is_via_ssh, - } - } -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct ActionEventRow { - // AppInfoBase - app_version: String, - major: Option, - minor: Option, - patch: Option, - checksum_matched: bool, - release_channel: String, - os_name: String, - os_version: String, - - // ClientEventBase - installation_id: Option, - // Note: This column name has a typo in the ClickHouse table. - #[serde(rename = "sesssion_id")] - session_id: Option, - is_staff: Option, - time: i64, - // ActionEventRow - source: String, - action: String, -} - -impl ActionEventRow { - fn from_event( - event: ActionEvent, - wrapper: &EventWrapper, - body: &EventRequestBody, - first_event_at: chrono::DateTime, - checksum_matched: bool, - ) -> Self { - let semver = body.semver(); - let time = - first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event); - - Self { - app_version: body.app_version.clone(), - major: semver.map(|v| v.major() as i32), - minor: semver.map(|v| v.minor() as i32), - patch: semver.map(|v| v.patch() as i32), - checksum_matched, - release_channel: body.release_channel.clone().unwrap_or_default(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone().unwrap_or_default(), - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - is_staff: body.is_staff, - time: time.timestamp_millis(), - source: event.source, - action: event.action, - } - } -} - pub fn calculate_json_checksum(app: Arc, json: &impl AsRef<[u8]>) -> Option> { let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?; @@ -1382,6 +448,7 @@ fn for_snowflake( body: EventRequestBody, first_event_at: chrono::DateTime, country_code: Option, + checksum_matched: bool, ) -> impl Iterator { body.events.into_iter().flat_map(move |event| { let timestamp = @@ -1555,6 +622,7 @@ fn for_snowflake( body.release_channel.clone().into(), ); map.insert("signed_in".to_string(), event.signed_in.into()); + map.insert("checksum_matched".to_string(), checksum_matched.into()); if let Some(country_code) = country_code.as_ref() { map.insert("country".to_string(), country_code.clone().into()); } diff --git a/crates/collab/src/clickhouse.rs b/crates/collab/src/clickhouse.rs deleted file mode 100644 index 2937116ba..000000000 --- a/crates/collab/src/clickhouse.rs +++ /dev/null @@ -1,28 +0,0 @@ -use serde::Serialize; - -/// Writes the given rows to the specified Clickhouse table. -pub async fn write_to_table( - table: &str, - rows: &[T], - clickhouse_client: &clickhouse::Client, -) -> anyhow::Result<()> { - if rows.is_empty() { - return Ok(()); - } - - let mut insert = clickhouse_client.insert(table)?; - - for event in rows { - insert.write(event).await?; - } - - insert.end().await?; - - let event_count = rows.len(); - log::info!( - "wrote {event_count} {event_specifier} to '{table}'", - event_specifier = if event_count == 1 { "event" } else { "events" } - ); - - Ok(()) -} diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 9c87b6982..86e0112f0 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -1,7 +1,6 @@ pub mod api; pub mod auth; mod cents; -pub mod clickhouse; pub mod db; pub mod env; pub mod executor; @@ -151,10 +150,6 @@ pub struct Config { pub seed_path: Option, pub database_max_connections: u32, pub api_token: String, - pub clickhouse_url: Option, - pub clickhouse_user: Option, - pub clickhouse_password: Option, - pub clickhouse_database: Option, pub invite_link_prefix: String, pub livekit_server: Option, pub livekit_key: Option, @@ -236,10 +231,6 @@ impl Config { prediction_api_url: None, prediction_api_key: None, prediction_model: None, - clickhouse_url: None, - clickhouse_user: None, - clickhouse_password: None, - clickhouse_database: None, zed_client_checksum_seed: None, slack_panics_webhook: None, auto_join_channel_id: None, @@ -289,7 +280,6 @@ pub struct AppState { pub stripe_billing: Option>, pub rate_limiter: Arc, pub executor: Executor, - pub clickhouse_client: Option<::clickhouse::Client>, pub kinesis_client: Option<::aws_sdk_kinesis::Client>, pub config: Config, } @@ -343,10 +333,6 @@ impl AppState { stripe_client, rate_limiter: Arc::new(RateLimiter::new(db)), executor, - clickhouse_client: config - .clickhouse_url - .as_ref() - .and_then(|_| build_clickhouse_client(&config).log_err()), kinesis_client: if config.kinesis_access_key.is_some() { build_kinesis_client(&config).await.log_err() } else { @@ -429,31 +415,3 @@ async fn build_kinesis_client(config: &Config) -> anyhow::Result anyhow::Result<::clickhouse::Client> { - Ok(::clickhouse::Client::default() - .with_url( - config - .clickhouse_url - .as_ref() - .ok_or_else(|| anyhow!("missing clickhouse_url"))?, - ) - .with_user( - config - .clickhouse_user - .as_ref() - .ok_or_else(|| anyhow!("missing clickhouse_user"))?, - ) - .with_password( - config - .clickhouse_password - .as_ref() - .ok_or_else(|| anyhow!("missing clickhouse_password"))?, - ) - .with_database( - config - .clickhouse_database - .as_ref() - .ok_or_else(|| anyhow!("missing clickhouse_database"))?, - )) -} diff --git a/crates/collab/src/llm.rs b/crates/collab/src/llm.rs index 22eb35305..efa8e8916 100644 --- a/crates/collab/src/llm.rs +++ b/crates/collab/src/llm.rs @@ -1,14 +1,11 @@ mod authorization; pub mod db; -mod telemetry; mod token; use crate::api::events::SnowflakeRow; use crate::api::CloudflareIpCountryHeader; use crate::build_kinesis_client; -use crate::{ - build_clickhouse_client, db::UserId, executor::Executor, Cents, Config, Error, Result, -}; +use crate::{db::UserId, executor::Executor, Cents, Config, Error, Result}; use anyhow::{anyhow, Context as _}; use authorization::authorize_access_to_language_model; use axum::routing::get; @@ -40,7 +37,6 @@ use std::{ task::{Context, Poll}, }; use strum::IntoEnumIterator; -use telemetry::{report_llm_rate_limit, report_llm_usage, LlmRateLimitEventRow, LlmUsageEventRow}; use tokio::sync::RwLock; use util::ResultExt; @@ -52,7 +48,6 @@ pub struct LlmState { pub db: Arc, pub http_client: ReqwestClient, pub kinesis_client: Option, - pub clickhouse_client: Option, active_user_count_by_model: RwLock, ActiveUserCount)>>, } @@ -89,10 +84,6 @@ impl LlmState { } else { None }, - clickhouse_client: config - .clickhouse_url - .as_ref() - .and_then(|_| build_clickhouse_client(&config).log_err()), active_user_count_by_model: RwLock::new(HashMap::default()), config, }; @@ -630,34 +621,6 @@ async fn check_usage_limit( .await .log_err(); - if let Some(client) = state.clickhouse_client.as_ref() { - report_llm_rate_limit( - client, - LlmRateLimitEventRow { - time: Utc::now().timestamp_millis(), - user_id: claims.user_id as i32, - is_staff: claims.is_staff, - plan: match claims.plan { - Plan::Free => "free".to_string(), - Plan::ZedPro => "zed_pro".to_string(), - }, - model: model.name.clone(), - provider: provider.to_string(), - usage_measure: resource.to_string(), - requests_this_minute: usage.requests_this_minute as u64, - tokens_this_minute: usage.tokens_this_minute as u64, - tokens_this_day: usage.tokens_this_day as u64, - users_in_recent_minutes: users_in_recent_minutes as u64, - users_in_recent_days: users_in_recent_days as u64, - max_requests_per_minute: per_user_max_requests_per_minute as u64, - max_tokens_per_minute: per_user_max_tokens_per_minute as u64, - max_tokens_per_day: per_user_max_tokens_per_day as u64, - }, - ) - .await - .log_err(); - } - return Err(Error::http( StatusCode::TOO_MANY_REQUESTS, format!("Rate limit exceeded. Maximum {} reached.", resource), @@ -765,44 +728,6 @@ impl Drop for TokenCountingStream { .write(&state.kinesis_client, &state.config.kinesis_stream) .await .log_err(); - - if let Some(clickhouse_client) = state.clickhouse_client.as_ref() { - report_llm_usage( - clickhouse_client, - LlmUsageEventRow { - time: Utc::now().timestamp_millis(), - user_id: claims.user_id as i32, - is_staff: claims.is_staff, - plan: match claims.plan { - Plan::Free => "free".to_string(), - Plan::ZedPro => "zed_pro".to_string(), - }, - model, - provider: provider.to_string(), - input_token_count: tokens.input as u64, - cache_creation_input_token_count: tokens.input_cache_creation as u64, - cache_read_input_token_count: tokens.input_cache_read as u64, - output_token_count: tokens.output as u64, - requests_this_minute: usage.requests_this_minute as u64, - tokens_this_minute: usage.tokens_this_minute as u64, - tokens_this_day: usage.tokens_this_day as u64, - input_tokens_this_month: usage.tokens_this_month.input as u64, - cache_creation_input_tokens_this_month: usage - .tokens_this_month - .input_cache_creation - as u64, - cache_read_input_tokens_this_month: usage - .tokens_this_month - .input_cache_read - as u64, - output_tokens_this_month: usage.tokens_this_month.output as u64, - spending_this_month: usage.spending_this_month.0 as u64, - lifetime_spending: usage.lifetime_spending.0 as u64, - }, - ) - .await - .log_err(); - } } }) } diff --git a/crates/collab/src/llm/telemetry.rs b/crates/collab/src/llm/telemetry.rs deleted file mode 100644 index 9daaaf303..000000000 --- a/crates/collab/src/llm/telemetry.rs +++ /dev/null @@ -1,65 +0,0 @@ -use anyhow::{Context, Result}; -use serde::Serialize; - -use crate::clickhouse::write_to_table; - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct LlmUsageEventRow { - pub time: i64, - pub user_id: i32, - pub is_staff: bool, - pub plan: String, - pub model: String, - pub provider: String, - pub input_token_count: u64, - pub cache_creation_input_token_count: u64, - pub cache_read_input_token_count: u64, - pub output_token_count: u64, - pub requests_this_minute: u64, - pub tokens_this_minute: u64, - pub tokens_this_day: u64, - pub input_tokens_this_month: u64, - pub cache_creation_input_tokens_this_month: u64, - pub cache_read_input_tokens_this_month: u64, - pub output_tokens_this_month: u64, - pub spending_this_month: u64, - pub lifetime_spending: u64, -} - -#[derive(Serialize, Debug, clickhouse::Row)] -pub struct LlmRateLimitEventRow { - pub time: i64, - pub user_id: i32, - pub is_staff: bool, - pub plan: String, - pub model: String, - pub provider: String, - pub usage_measure: String, - pub requests_this_minute: u64, - pub tokens_this_minute: u64, - pub tokens_this_day: u64, - pub users_in_recent_minutes: u64, - pub users_in_recent_days: u64, - pub max_requests_per_minute: u64, - pub max_tokens_per_minute: u64, - pub max_tokens_per_day: u64, -} - -pub async fn report_llm_usage(client: &clickhouse::Client, row: LlmUsageEventRow) -> Result<()> { - const LLM_USAGE_EVENTS_TABLE: &str = "llm_usage_events"; - write_to_table(LLM_USAGE_EVENTS_TABLE, &[row], client) - .await - .with_context(|| format!("failed to upload to table '{LLM_USAGE_EVENTS_TABLE}'"))?; - Ok(()) -} - -pub async fn report_llm_rate_limit( - client: &clickhouse::Client, - row: LlmRateLimitEventRow, -) -> Result<()> { - const LLM_RATE_LIMIT_EVENTS_TABLE: &str = "llm_rate_limit_events"; - write_to_table(LLM_RATE_LIMIT_EVENTS_TABLE, &[row], client) - .await - .with_context(|| format!("failed to upload to table '{LLM_RATE_LIMIT_EVENTS_TABLE}'"))?; - Ok(()) -} diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index 91e103510..39236f168 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -518,7 +518,6 @@ impl TestServer { stripe_billing: None, rate_limiter: Arc::new(RateLimiter::new(test_db.db().clone())), executor, - clickhouse_client: None, kinesis_client: None, config: Config { http_port: 0, @@ -549,10 +548,6 @@ impl TestServer { prediction_api_url: None, prediction_api_key: None, prediction_model: None, - clickhouse_url: None, - clickhouse_user: None, - clickhouse_password: None, - clickhouse_database: None, zed_client_checksum_seed: None, slack_panics_webhook: None, auto_join_channel_id: None, diff --git a/docs/src/telemetry.md b/docs/src/telemetry.md index 7267bbdc0..20018b920 100644 --- a/docs/src/telemetry.md +++ b/docs/src/telemetry.md @@ -22,8 +22,8 @@ The telemetry settings can also be configured via the welcome screen, which can Telemetry is sent from the application to our servers. Data is proxied through our servers to enable us to easily switch analytics services. We currently use: - [Axiom](https://axiom.co): Cloud-monitoring service - stores diagnostic events -- [Clickhouse](https://clickhouse.com): Business Intelligence platform - stores both diagnostic and metric events -- [Metabase](https://www.metabase.com): Dashboards - dashboards built around data pulled from Clickhouse +- [Snowflake](https://snowflake.com): Business Intelligence platform - stores both diagnostic and metric events +- [Metabase](https://www.metabase.com): Dashboards - dashboards built around data pulled from Snowflake ## Types of Telemetry diff --git a/legal/subprocessors.md b/legal/subprocessors.md index 286d1fc80..db55f8445 100644 --- a/legal/subprocessors.md +++ b/legal/subprocessors.md @@ -10,13 +10,14 @@ This page provides information about the Subprocessors Zed has engaged to provid | Cloudflare | Cloud Infrastructure | Worldwide | | Vercel | Cloud Infrastructure | United States | | DigitalOcean | Cloud Infrastructure | United States | +| AWS | Cloud Infrastructure | United States | | ConvertKit | Email Marketing | United States | | Axiom | Analytics | United States | -| ClickHouse | Analytics | United States | +| Snowflake | Analytics | United States | | Metabase | Analytics | United States | | GitHub | Authentication | United States | | LiveKit | Audio Conferencing | United States | | Anthropic | AI Services | United States | | OpenAI | AI Services | United States | -**DATE: August 19, 2024** +**DATE: December 9, 2024** diff --git a/typos.toml b/typos.toml index 50f3aadd0..a1788fff5 100644 --- a/typos.toml +++ b/typos.toml @@ -54,9 +54,6 @@ extend-ignore-re = [ '"ba"', # :/ crates/collab/migrations/20231009181554_add_release_channel_to_rooms.sql "COLUMN enviroment", - # Typo in ClickHouse column name. - # crates/collab/src/api/events.rs - "rename = \"sesssion_id\"", "doas", # ProtoLS crate with tree-sitter Protobuf grammar. "protols",