Remove the analytics code
parent
db7cec52bb
commit
25dac2b862
|
@ -58,49 +58,3 @@ jobs:
|
|||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
DISTRIBUTION_ID: ${{ secrets.AWS_CLOUDFRONT_DISTRIBUTION_ID }}
|
||||
|
||||
deploy-analytics-lambda:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout the Repository
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Install the Stable Rust Toolchain
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
|
||||
- name: Setup Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
|
||||
- name: Install Zig Toolchain
|
||||
uses: korandoru/setup-zig@v1
|
||||
with:
|
||||
zig-version: 0.10.0
|
||||
|
||||
- name: Install Cargo Lambda
|
||||
uses: jaxxstorm/action-install-gh-release@v1.9.0
|
||||
with:
|
||||
repo: cargo-lambda/cargo-lambda
|
||||
|
||||
- name: Build Lambda Function
|
||||
run: cd analytics/lambda && cargo lambda build --release --arm64 --output-format zip
|
||||
|
||||
- name: Configure AWS CLI
|
||||
run: |
|
||||
mkdir ~/.aws
|
||||
echo "[default]" > ~/.aws/config
|
||||
echo "credential_source = Environment" >> ~/.aws/config
|
||||
|
||||
- name: Deploy Lambda Function
|
||||
run: |
|
||||
aws lambda update-function-code --function-name analytics_lambda \
|
||||
--zip-file "fileb://$(pwd)/analytics/lambda/target/lambda/analytics/bootstrap.zip" --publish
|
||||
env:
|
||||
AWS_DEFAULT_REGION: eu-west-1
|
||||
AWS_ACCESS_KEY_ID: "${{ secrets.ANALYTICS_DEPLOYER_ACCESS_KEY_ID }}"
|
||||
AWS_SECRET_ACCESS_KEY: "${{ secrets.ANALYTICS_DEPLOYER_SECRET_ACCESS_KEY }}"
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
[watch]
|
||||
ignore = [
|
||||
"analytics",
|
||||
"cf",
|
||||
"media"
|
||||
]
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
/target
|
File diff suppressed because it is too large
Load Diff
|
@ -1,38 +0,0 @@
|
|||
[package]
|
||||
name = "analytics-lambda"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
|
||||
[features]
|
||||
local = []
|
||||
|
||||
[dependencies]
|
||||
async-trait = { version = "0.1" }
|
||||
env_logger = { version = "0.10" }
|
||||
fernet = { version = "0.2" }
|
||||
lambda_runtime = "0.8"
|
||||
log = { version = "0.4" }
|
||||
openssl = { version = "0.10", features = ["vendored"] }
|
||||
poem = { version = "1.3" }
|
||||
poem-lambda = { version = "1.3" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "1.0" }
|
||||
time = { version = "0.3", features = ["formatting", "serde"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
toml = { version = "0.8" }
|
||||
tracing = { version = "0.1" }
|
||||
tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "tracing-log"] }
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
|
||||
analytics-model = { path = "../model" }
|
||||
|
||||
[dependencies.sqlx]
|
||||
version = "0.7"
|
||||
features = [
|
||||
"migrate",
|
||||
"postgres",
|
||||
"runtime-tokio-rustls",
|
||||
"time",
|
||||
"uuid"
|
||||
]
|
|
@ -1,26 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
DB_CONTAINER_NAME="blakerain-analytics-db"
|
||||
DB_CONTAINER_PORT=5101
|
||||
|
||||
# Stop the database docker container (if it is already running).
|
||||
docker stop "$DB_CONTAINER_NAME" || true
|
||||
|
||||
# Start the local database, passing in defaults that correspond to those in 'local.toml'
|
||||
# configuration file.
|
||||
docker run --rm --name "$DB_CONTAINER_NAME" -d \
|
||||
-e POSTGRES_USER=analytics_local \
|
||||
-e POSTGRES_PASSWORD=analytics_local \
|
||||
-e POSTGRES_DB=analytics_local \
|
||||
-p $DB_CONTAINER_PORT:5432 \
|
||||
postgres:alpine \
|
||||
-c log_statement=all
|
||||
|
||||
# Make sure that 'cargo watch' is installed
|
||||
cargo install cargo-watch
|
||||
|
||||
# Runt he language function, reloading any changes.
|
||||
cargo watch -B 1 -L debug -- cargo run --features local --bin analytics
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
[db]
|
||||
endpoint = "localhost"
|
||||
port = 5101
|
||||
username = "analytics_local"
|
||||
password = "analytics_local"
|
||||
dbname = "analytics_local"
|
||||
|
||||
[auth]
|
||||
# This is a very poor key, but it shouldn't trigger GitHub
|
||||
token_key = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE="
|
|
@ -1,63 +0,0 @@
|
|||
use analytics_lambda::{
|
||||
config::{load_from_env, load_from_file},
|
||||
endpoints::auth::AuthContext,
|
||||
env::Env,
|
||||
handlers::{
|
||||
auth::{new_password, signin, validate_token},
|
||||
page_view::{append_page_view, record_page_view},
|
||||
query::{query_day_view, query_month_view, query_week_view},
|
||||
},
|
||||
};
|
||||
use analytics_model::MIGRATOR;
|
||||
use lambda_runtime::Error;
|
||||
use poem::{get, middleware, post, Endpoint, EndpointExt, Route};
|
||||
|
||||
async fn create() -> Result<impl Endpoint, Error> {
|
||||
let config = if cfg!(feature = "local") {
|
||||
load_from_file()
|
||||
} else {
|
||||
load_from_env().await
|
||||
}?;
|
||||
|
||||
let env = Env::new(config).await;
|
||||
MIGRATOR.run(&env.pool).await?;
|
||||
|
||||
Ok(Route::new()
|
||||
.at("/page_view", post(record_page_view))
|
||||
.at("/page_view/:id", post(append_page_view))
|
||||
.at("/auth/sign_in", post(signin))
|
||||
.at("/auth/new_password", post(new_password))
|
||||
.at("/auth/validate", post(validate_token))
|
||||
.at("/query/month/:year/:month", get(query_month_view))
|
||||
.at("/query/week/:year/:week", get(query_week_view))
|
||||
.at("/query/day/:year/:month/:day", get(query_day_view))
|
||||
.with(AuthContext::new(&["/auth", "/page_view"], env.clone()))
|
||||
.with(middleware::Cors::new())
|
||||
.with(middleware::Tracing)
|
||||
.data(env))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
let filter_layer = tracing_subscriber::filter::EnvFilter::builder()
|
||||
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(filter_layer)
|
||||
.without_time()
|
||||
.with_ansi(cfg!(feature = "local"))
|
||||
.init();
|
||||
|
||||
let endpoint = create().await?;
|
||||
|
||||
if cfg!(feature = "local") {
|
||||
poem::Server::new(poem::listener::TcpListener::bind("127.0.0.1:3000"))
|
||||
.run(endpoint)
|
||||
.await?;
|
||||
} else {
|
||||
poem_lambda::run(endpoint).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,72 +0,0 @@
|
|||
use analytics_lambda::{
|
||||
config::{load_from_env, load_from_file},
|
||||
env::Env,
|
||||
};
|
||||
use analytics_model::MIGRATOR;
|
||||
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
|
||||
use serde::Deserialize;
|
||||
use sqlx::PgPool;
|
||||
|
||||
async fn destroy(pool: &PgPool) -> sqlx::Result<()> {
|
||||
sqlx::query("DROP SCHEMA public CASCADE")
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE SCHEMA public").execute(pool).await?;
|
||||
|
||||
sqlx::query("GRANT ALL ON SCHEMA public TO analytics")
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("GRANT ALL ON SCHEMA public TO public")
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Options {
|
||||
#[serde(default)]
|
||||
destroy: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
let filter_layer = tracing_subscriber::filter::EnvFilter::builder()
|
||||
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(filter_layer)
|
||||
.without_time()
|
||||
.with_ansi(cfg!(feature = "local"))
|
||||
.init();
|
||||
|
||||
run(service_fn(
|
||||
move |event: LambdaEvent<serde_json::Value>| async move {
|
||||
let options: Options = serde_json::from_value(event.payload).expect("options");
|
||||
|
||||
let config = if cfg!(feature = "local") {
|
||||
load_from_file()
|
||||
} else {
|
||||
load_from_env().await
|
||||
}?;
|
||||
|
||||
let pool = Env::create_pool(&config).await;
|
||||
|
||||
if options.destroy {
|
||||
log::info!("Destroying database");
|
||||
destroy(&pool).await?;
|
||||
}
|
||||
|
||||
log::info!("Running migrations");
|
||||
MIGRATOR.run(&pool).await?;
|
||||
|
||||
Ok::<(), Error>(())
|
||||
},
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
use std::io::Read;
|
||||
|
||||
use fernet::Fernet;
|
||||
use lambda_runtime::Error;
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
pub db: DbConfig,
|
||||
pub auth: AuthConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct DbConfig {
|
||||
pub endpoint: String,
|
||||
pub port: Option<u16>,
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub dbname: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AuthConfig {
|
||||
pub token_key: String,
|
||||
}
|
||||
|
||||
pub fn load_from_file() -> Result<Config, Error> {
|
||||
log::info!("Loading configuration from 'local.toml'");
|
||||
let path = std::env::current_dir()?.join("local.toml");
|
||||
if !path.is_file() {
|
||||
log::error!("Local configuration file 'local.toml' not found");
|
||||
return Err("Missing configuration file".into());
|
||||
}
|
||||
|
||||
let mut file = std::fs::File::open(path)?;
|
||||
let mut content = String::new();
|
||||
file.read_to_string(&mut content)?;
|
||||
let config = toml::from_str(&content)?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
pub async fn load_from_env() -> Result<Config, Error> {
|
||||
let endpoint = std::env::var("DATABASE_ENDPOINT")?;
|
||||
let password = std::env::var("DATABASE_PASSWORD")?;
|
||||
let token_key = std::env::var("TOKEN_KEY").unwrap_or_else(|_| {
|
||||
log::info!("Unable to find TOKEN_KEY environment variable; falling back to generated key");
|
||||
Fernet::generate_key()
|
||||
});
|
||||
|
||||
let db = DbConfig {
|
||||
endpoint,
|
||||
port: None,
|
||||
username: "analytics".to_string(),
|
||||
password,
|
||||
dbname: "analytics".to_string(),
|
||||
};
|
||||
|
||||
let auth = AuthConfig { token_key };
|
||||
|
||||
Ok(Config { db, auth })
|
||||
}
|
|
@ -1,118 +0,0 @@
|
|||
use analytics_model::user::User;
|
||||
use async_trait::async_trait;
|
||||
use fernet::Fernet;
|
||||
use poem::{
|
||||
error::InternalServerError,
|
||||
http::StatusCode,
|
||||
web::headers::{self, authorization::Bearer, HeaderMapExt},
|
||||
Endpoint, Middleware, Request,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::env::Env;
|
||||
|
||||
pub struct AuthContext {
|
||||
skip_prefixes: Vec<String>,
|
||||
env: Env,
|
||||
}
|
||||
|
||||
impl AuthContext {
|
||||
pub fn new(skip_prefixes: &[&str], env: Env) -> Self {
|
||||
Self {
|
||||
skip_prefixes: skip_prefixes.iter().map(ToString::to_string).collect(),
|
||||
env,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Endpoint> Middleware<E> for AuthContext {
|
||||
type Output = AuthEndpoint<E>;
|
||||
|
||||
fn transform(&self, ep: E) -> Self::Output {
|
||||
AuthEndpoint::new(self.skip_prefixes.clone(), self.env.clone(), ep)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AuthEndpoint<E: Endpoint> {
|
||||
skip_prefixes: Vec<String>,
|
||||
env: Env,
|
||||
endpoint: E,
|
||||
}
|
||||
|
||||
impl<E: Endpoint> AuthEndpoint<E> {
|
||||
fn new(skip_prefixes: Vec<String>, env: Env, endpoint: E) -> Self {
|
||||
Self {
|
||||
skip_prefixes,
|
||||
env,
|
||||
endpoint,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<E: Endpoint> Endpoint for AuthEndpoint<E> {
|
||||
type Output = E::Output;
|
||||
|
||||
async fn call(&self, mut request: Request) -> poem::Result<Self::Output> {
|
||||
for skip_prefix in &self.skip_prefixes {
|
||||
if request.uri().path().starts_with(skip_prefix) {
|
||||
return self.endpoint.call(request).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that we have an 'Authorization' header that has a 'Bearer' token.
|
||||
let Some(auth) = request.headers().typed_get::<headers::Authorization<Bearer>>() else {
|
||||
log::info!("Missing 'Authorization' header with 'Bearer' token");
|
||||
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
|
||||
};
|
||||
|
||||
// Ensure that we can decrypt the token using the provided Fernet key.
|
||||
let Token { user_id } = match Token::decode(&self.env.fernet, auth.token()) {
|
||||
Some(token) => token,
|
||||
None => {
|
||||
log::error!("Failed to decode authentication token");
|
||||
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
|
||||
}
|
||||
};
|
||||
|
||||
// If the user no longer exists, then a simple 401 will suffice.
|
||||
let Some(user) = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
|
||||
.bind(user_id).fetch_optional(&self.env.pool).await.map_err(InternalServerError)? else {
|
||||
log::error!("User '{user_id}' no longer exists");
|
||||
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
|
||||
};
|
||||
|
||||
// Make sure that the user is still enabled.
|
||||
if !user.enabled {
|
||||
log::error!("User '{user_id}' is not enabled");
|
||||
return Err(poem::Error::from_status(StatusCode::FORBIDDEN));
|
||||
}
|
||||
|
||||
// Store the authenticated user in the request for retrieval by handlers.
|
||||
request.set_data(user);
|
||||
|
||||
self.endpoint.call(request).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Token {
|
||||
pub user_id: Uuid,
|
||||
}
|
||||
|
||||
impl Token {
|
||||
pub fn new(user_id: Uuid) -> Self {
|
||||
Self { user_id }
|
||||
}
|
||||
|
||||
pub fn encode(&self, fernet: &Fernet) -> String {
|
||||
let plain = serde_json::to_string(self).expect("Unable to JSON encode token");
|
||||
fernet.encrypt(plain.as_bytes())
|
||||
}
|
||||
|
||||
pub fn decode(fernet: &Fernet, encoded: &str) -> Option<Self> {
|
||||
let plain = fernet.decrypt(encoded).ok()?;
|
||||
serde_json::from_slice(&plain).ok()
|
||||
}
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use fernet::Fernet;
|
||||
use log::LevelFilter;
|
||||
use sqlx::postgres::PgConnectOptions;
|
||||
use sqlx::ConnectOptions;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
pub struct Env {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
impl Clone for Env {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: Arc::clone(&self.inner),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Env {
|
||||
type Target = Inner;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Inner {
|
||||
pub pool: sqlx::PgPool,
|
||||
pub fernet: Fernet,
|
||||
}
|
||||
|
||||
impl Env {
|
||||
pub async fn create_pool(config: &Config) -> sqlx::PgPool {
|
||||
let mut connection_opts = PgConnectOptions::new()
|
||||
.host(&config.db.endpoint)
|
||||
.username(&config.db.username)
|
||||
.password(&config.db.password)
|
||||
.database(&config.db.dbname)
|
||||
.log_statements(LevelFilter::Debug)
|
||||
.log_slow_statements(LevelFilter::Warn, Duration::from_secs(1));
|
||||
|
||||
if let Some(port) = config.db.port {
|
||||
connection_opts = connection_opts.port(port);
|
||||
}
|
||||
|
||||
sqlx::PgPool::connect_with(connection_opts).await.unwrap()
|
||||
}
|
||||
|
||||
pub async fn new(config: Config) -> Self {
|
||||
let pool = Self::create_pool(&config).await;
|
||||
let inner = Inner {
|
||||
pool,
|
||||
fernet: Fernet::new(&config.auth.token_key).expect("valid fernet key"),
|
||||
};
|
||||
|
||||
Self {
|
||||
inner: Arc::new(inner),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,3 +0,0 @@
|
|||
pub mod auth;
|
||||
pub mod page_view;
|
||||
pub mod query;
|
|
@ -1,111 +0,0 @@
|
|||
use analytics_model::user::{authenticate, reset_password, User};
|
||||
use poem::{
|
||||
error::InternalServerError,
|
||||
handler,
|
||||
web::{Data, Json},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{endpoints::auth::Token, env::Env};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct SignInBody {
|
||||
username: String,
|
||||
password: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum SignInResponse {
|
||||
InvalidCredentials,
|
||||
NewPassword,
|
||||
Successful { token: String },
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct NewPasswordBody {
|
||||
username: String,
|
||||
#[serde(rename = "oldPassword")]
|
||||
old_password: String,
|
||||
#[serde(rename = "newPassword")]
|
||||
new_password: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ValidateTokenBody {
|
||||
token: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum ValidateTokenResponse {
|
||||
Invalid,
|
||||
Valid { token: String },
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn signin(
|
||||
env: Data<&Env>,
|
||||
Json(SignInBody { username, password }): Json<SignInBody>,
|
||||
) -> poem::Result<Json<SignInResponse>> {
|
||||
let Some(user) = authenticate(&env.pool, &username, &password).await.map_err(InternalServerError)? else {
|
||||
return Ok(Json(SignInResponse::InvalidCredentials));
|
||||
};
|
||||
|
||||
if user.reset_password {
|
||||
return Ok(Json(SignInResponse::NewPassword));
|
||||
}
|
||||
|
||||
let token = Token::new(user.id);
|
||||
let token = token.encode(&env.fernet);
|
||||
Ok(Json(SignInResponse::Successful { token }))
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn new_password(
|
||||
env: Data<&Env>,
|
||||
Json(NewPasswordBody {
|
||||
username,
|
||||
old_password,
|
||||
new_password,
|
||||
}): Json<NewPasswordBody>,
|
||||
) -> poem::Result<Json<SignInResponse>> {
|
||||
let Some(user) = authenticate(&env.pool, &username, &old_password).await.map_err(InternalServerError)? else {
|
||||
return Ok(Json(SignInResponse::InvalidCredentials));
|
||||
};
|
||||
|
||||
let Some(user) = reset_password(&env.pool, user.id, new_password).await.map_err(InternalServerError)? else {
|
||||
return Ok(Json(SignInResponse::InvalidCredentials));
|
||||
};
|
||||
|
||||
let token = Token::new(user.id);
|
||||
let token = token.encode(&env.fernet);
|
||||
Ok(Json(SignInResponse::Successful { token }))
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn validate_token(
|
||||
env: Data<&Env>,
|
||||
Json(ValidateTokenBody { token }): Json<ValidateTokenBody>,
|
||||
) -> poem::Result<Json<ValidateTokenResponse>> {
|
||||
let Some(Token { user_id }) = Token::decode(&env.fernet, &token) else {
|
||||
log::error!("Failed to decode authentication token");
|
||||
return Ok(Json(ValidateTokenResponse::Invalid));
|
||||
};
|
||||
|
||||
let Some(user) = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
|
||||
.bind(user_id).fetch_optional(&env.pool).await.map_err(InternalServerError)? else {
|
||||
log::error!("User '{user_id}' no longer exists");
|
||||
return Ok(Json(ValidateTokenResponse::Invalid));
|
||||
};
|
||||
|
||||
if !user.enabled {
|
||||
log::error!("User '{user_id}' is not enabled");
|
||||
return Ok(Json(ValidateTokenResponse::Invalid));
|
||||
}
|
||||
|
||||
let token = Token::new(user.id);
|
||||
let token = token.encode(&env.fernet);
|
||||
|
||||
Ok(Json(ValidateTokenResponse::Valid { token }))
|
||||
}
|
|
@ -1,92 +0,0 @@
|
|||
use analytics_model::view::{self, create_page_view, PageView};
|
||||
use poem::{
|
||||
handler,
|
||||
web::{Data, Json, Path},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::env::Env;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct PageViewBody {
|
||||
path: Option<String>,
|
||||
ua: Option<String>,
|
||||
vw: Option<i32>,
|
||||
vh: Option<i32>,
|
||||
sw: Option<i32>,
|
||||
sh: Option<i32>,
|
||||
tz: Option<String>,
|
||||
rf: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct PageViewResponse {
|
||||
id: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn record_page_view(
|
||||
env: Data<&Env>,
|
||||
Json(PageViewBody {
|
||||
path,
|
||||
ua,
|
||||
vw,
|
||||
vh,
|
||||
sw,
|
||||
sh,
|
||||
tz,
|
||||
rf,
|
||||
}): Json<PageViewBody>,
|
||||
) -> poem::Result<Json<PageViewResponse>> {
|
||||
let id = if let Some(path) = path {
|
||||
let id = Uuid::new_v4();
|
||||
let view = PageView {
|
||||
id,
|
||||
path,
|
||||
time: OffsetDateTime::now_utc(),
|
||||
user_agent: ua,
|
||||
viewport_width: vw,
|
||||
viewport_height: vh,
|
||||
screen_width: sw,
|
||||
screen_height: sh,
|
||||
timezone: tz,
|
||||
referrer: rf,
|
||||
beacon: false,
|
||||
duration: None,
|
||||
scroll: None,
|
||||
};
|
||||
|
||||
if let Err(err) = create_page_view(&env.pool, view).await {
|
||||
log::error!("Failed to record page view: {err:?}");
|
||||
None
|
||||
} else {
|
||||
Some(id)
|
||||
}
|
||||
} else {
|
||||
log::info!("Ignoring request for pageview image with no path");
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Json(PageViewResponse { id }))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct AppendPageViewBody {
|
||||
duration: f64,
|
||||
scroll: f64,
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn append_page_view(
|
||||
env: Data<&Env>,
|
||||
Path(id): Path<Uuid>,
|
||||
Json(AppendPageViewBody { duration, scroll }): Json<AppendPageViewBody>,
|
||||
) -> poem::Result<Json<()>> {
|
||||
if let Err(err) = view::append_page_view(&env.pool, id, duration, scroll).await {
|
||||
log::error!("Failed to append page view: {err:?}");
|
||||
}
|
||||
|
||||
Ok(Json(()))
|
||||
}
|
|
@ -1,48 +0,0 @@
|
|||
use analytics_model::{
|
||||
query::{query_page_views, PageViewsQueryResult},
|
||||
user::User,
|
||||
view::{PageViewsDay, PageViewsMonth, PageViewsWeek},
|
||||
};
|
||||
use poem::{
|
||||
error::InternalServerError,
|
||||
handler,
|
||||
web::{Data, Json, Path},
|
||||
};
|
||||
|
||||
use crate::env::Env;
|
||||
|
||||
#[handler]
|
||||
pub async fn query_month_view(
|
||||
env: Data<&Env>,
|
||||
_: Data<&User>,
|
||||
Path(path): Path<(i32, i32)>,
|
||||
) -> poem::Result<Json<PageViewsQueryResult<PageViewsMonth>>> {
|
||||
query_page_views(&env.pool, path)
|
||||
.await
|
||||
.map_err(InternalServerError)
|
||||
.map(Json)
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn query_week_view(
|
||||
env: Data<&Env>,
|
||||
_: Data<&User>,
|
||||
Path(path): Path<(i32, i32)>,
|
||||
) -> poem::Result<Json<PageViewsQueryResult<PageViewsWeek>>> {
|
||||
query_page_views(&env.pool, path)
|
||||
.await
|
||||
.map_err(InternalServerError)
|
||||
.map(Json)
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn query_day_view(
|
||||
env: Data<&Env>,
|
||||
_: Data<&User>,
|
||||
Path(path): Path<(i32, i32, i32)>,
|
||||
) -> poem::Result<Json<PageViewsQueryResult<PageViewsDay>>> {
|
||||
query_page_views(&env.pool, path)
|
||||
.await
|
||||
.map_err(InternalServerError)
|
||||
.map(Json)
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
pub mod config;
|
||||
pub mod env;
|
||||
pub mod handlers;
|
||||
|
||||
pub mod endpoints {
|
||||
pub mod auth;
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
/target
|
File diff suppressed because it is too large
Load Diff
|
@ -1,23 +0,0 @@
|
|||
[package]
|
||||
name = "analytics-model"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
log = { version = "0.4" }
|
||||
pbkdf2 = { version = "0.12", features = ["simple"] }
|
||||
rand_core = { version = "0.6", features = ["std"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
time = { version = "0.3", features = ["formatting", "serde"] }
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
|
||||
[dependencies.sqlx]
|
||||
version = "0.7"
|
||||
features = [
|
||||
"migrate",
|
||||
"postgres",
|
||||
"runtime-tokio-rustls",
|
||||
"time",
|
||||
"uuid"
|
||||
]
|
|
@ -1,61 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS page_views (
|
||||
id UUID NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
path TEXT NOT NULL,
|
||||
time TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
user_agent TEXT,
|
||||
viewport_width INTEGER,
|
||||
viewport_height INTEGER,
|
||||
screen_width INTEGER,
|
||||
screen_height INTEGER,
|
||||
timezone TEXT,
|
||||
referrer TEXT,
|
||||
beacon BOOLEAN NOT NULL,
|
||||
duration FLOAT8,
|
||||
scroll FLOAT8
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS page_views_day (
|
||||
id UUID NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
path TEXT NOT NULL,
|
||||
year INTEGER NOT NULL,
|
||||
month INTEGER NOT NULL,
|
||||
day INTEGER NOT NULL,
|
||||
hour INTEGER NOT NULL,
|
||||
count INTEGER NOT NULL,
|
||||
total_beacon INTEGER NOT NULL,
|
||||
total_scroll FLOAT8 NOT NULL,
|
||||
total_duration FLOAT8 NOT NULL,
|
||||
|
||||
CONSTRAINT unique_page_views_day
|
||||
UNIQUE (path, year, month, day, hour)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS page_views_week (
|
||||
id UUID NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
path TEXT NOT NULL,
|
||||
year INTEGER NOT NULL,
|
||||
week INTEGER NOT NULL,
|
||||
dow INTEGER NOT NULL,
|
||||
count INTEGER NOT NULL,
|
||||
total_beacon INTEGER NOT NULL,
|
||||
total_scroll FLOAT8 NOT NULL,
|
||||
total_duration FLOAT8 NOT NULL,
|
||||
|
||||
CONSTRAINT unique_page_views_week
|
||||
UNIQUE (path, year, week, dow)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS page_views_month (
|
||||
id UUID NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
path TEXT NOT NULL,
|
||||
year INTEGER NOT NULL,
|
||||
month INTEGER NOT NULL,
|
||||
day INTEGER NOT NULL,
|
||||
count INTEGER NOT NULL,
|
||||
total_beacon INTEGER NOT NULL,
|
||||
total_scroll FLOAT8 NOT NULL,
|
||||
total_duration FLOAT8 NOT NULL,
|
||||
|
||||
CONSTRAINT unique_page_views_month
|
||||
UNIQUE (path, year, month, day)
|
||||
);
|
|
@ -1,14 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS users (
|
||||
id UUID NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
username TEXT NOT NULL,
|
||||
password TEXT NOT NULL,
|
||||
enabled BOOLEAN NOT NULL,
|
||||
reset_password BOOLEAN NOT NULL,
|
||||
|
||||
CONSTRAINT unique_username
|
||||
UNIQUE (username)
|
||||
);
|
||||
|
||||
-- Create an intial user that has a temporary password. The password is: admin
|
||||
INSERT INTO users (username, password, enabled, reset_password)
|
||||
VALUES('admin', '$pbkdf2-sha256$i=600000,l=32$V62SYtsc1HWC2hV3jbevjg$OrOHoTwo1YPmNrPUnAUy3Vfg4Lrw90mxOTTISVHmjnk', TRUE, TRUE);
|
|
@ -1,5 +0,0 @@
|
|||
pub mod user;
|
||||
pub mod view;
|
||||
pub mod query;
|
||||
|
||||
pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
|
|
@ -1,155 +0,0 @@
|
|||
use serde::Serialize;
|
||||
use sqlx::{query_builder::Separated, PgPool, QueryBuilder};
|
||||
|
||||
use crate::view::{PageViewsDay, PageViewsMonth, PageViewsWeek};
|
||||
|
||||
#[derive(Debug, Default, Clone, sqlx::FromRow, Serialize)]
|
||||
pub struct PageViewsPathCount {
|
||||
pub path: String,
|
||||
pub count: i64,
|
||||
pub beacons: i64,
|
||||
pub total_duration: f64,
|
||||
pub total_scroll: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct PageViewsQueryResult<T> {
|
||||
pub site: PageViewsPathCount,
|
||||
pub views: Vec<T>,
|
||||
pub paths: Vec<PageViewsPathCount>,
|
||||
}
|
||||
|
||||
pub trait PageViewsQuery {
|
||||
type Args;
|
||||
|
||||
fn table_name() -> &'static str;
|
||||
fn order_column() -> &'static str;
|
||||
fn add_conditions(cond: &mut Separated<sqlx::Postgres, &str>, args: &Self::Args);
|
||||
}
|
||||
|
||||
impl PageViewsQuery for PageViewsDay {
|
||||
type Args = (i32, i32, i32);
|
||||
|
||||
fn table_name() -> &'static str {
|
||||
"page_views_day"
|
||||
}
|
||||
|
||||
fn order_column() -> &'static str {
|
||||
"hour"
|
||||
}
|
||||
|
||||
fn add_conditions(cond: &mut Separated<sqlx::Postgres, &str>, (year, month, day): &Self::Args) {
|
||||
cond.push("year = ");
|
||||
cond.push_bind_unseparated(*year);
|
||||
cond.push("month = ");
|
||||
cond.push_bind_unseparated(*month);
|
||||
cond.push("day = ");
|
||||
cond.push_bind_unseparated(*day);
|
||||
}
|
||||
}
|
||||
|
||||
impl PageViewsQuery for PageViewsWeek {
|
||||
type Args = (i32, i32);
|
||||
|
||||
fn table_name() -> &'static str {
|
||||
"page_views_week"
|
||||
}
|
||||
|
||||
fn order_column() -> &'static str {
|
||||
"dow"
|
||||
}
|
||||
|
||||
fn add_conditions(cond: &mut Separated<sqlx::Postgres, &str>, (year, week): &Self::Args) {
|
||||
cond.push("year = ");
|
||||
cond.push_bind_unseparated(*year);
|
||||
cond.push("week = ");
|
||||
cond.push_bind_unseparated(*week);
|
||||
}
|
||||
}
|
||||
|
||||
impl PageViewsQuery for PageViewsMonth {
|
||||
type Args = (i32, i32);
|
||||
|
||||
fn table_name() -> &'static str {
|
||||
"page_views_month"
|
||||
}
|
||||
|
||||
fn order_column() -> &'static str {
|
||||
"day"
|
||||
}
|
||||
|
||||
fn add_conditions(cond: &mut Separated<sqlx::Postgres, &str>, (year, month): &Self::Args) {
|
||||
cond.push("year = ");
|
||||
cond.push_bind_unseparated(*year);
|
||||
cond.push("month = ");
|
||||
cond.push_bind_unseparated(*month);
|
||||
}
|
||||
}
|
||||
|
||||
async fn query_page_view_rows<T>(pool: &PgPool, path: &str, args: &T::Args) -> sqlx::Result<Vec<T>>
|
||||
where
|
||||
T: PageViewsQuery + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
|
||||
{
|
||||
let mut query = QueryBuilder::new("SELECT * FROM ");
|
||||
query.push(T::table_name());
|
||||
query.push(" WHERE ");
|
||||
|
||||
{
|
||||
let mut cond = query.separated(" AND ");
|
||||
cond.push("path = ");
|
||||
cond.push_bind_unseparated(path);
|
||||
T::add_conditions(&mut cond, args);
|
||||
}
|
||||
|
||||
query.push(" ORDER BY ");
|
||||
query.push(T::order_column());
|
||||
|
||||
query.build_query_as::<T>().fetch_all(pool).await
|
||||
}
|
||||
|
||||
async fn query_page_views_paths<T>(
|
||||
pool: &PgPool,
|
||||
args: &T::Args,
|
||||
) -> sqlx::Result<Vec<PageViewsPathCount>>
|
||||
where
|
||||
T: PageViewsQuery,
|
||||
{
|
||||
let mut query = QueryBuilder::new("SELECT ");
|
||||
query.push("path, ");
|
||||
query.push("SUM(count) AS count, ");
|
||||
query.push("SUM(total_beacon) AS beacons, ");
|
||||
query.push("SUM(total_duration) AS total_duration, ");
|
||||
query.push("SUM(total_scroll) AS total_scroll FROM ");
|
||||
query.push(T::table_name());
|
||||
query.push(" WHERE ");
|
||||
|
||||
{
|
||||
let mut cond = query.separated(" AND ");
|
||||
T::add_conditions(&mut cond, args);
|
||||
}
|
||||
|
||||
query.push(" GROUP BY path");
|
||||
|
||||
query
|
||||
.build_query_as::<PageViewsPathCount>()
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn query_page_views<T>(
|
||||
pool: &PgPool,
|
||||
args: T::Args,
|
||||
) -> sqlx::Result<PageViewsQueryResult<T>>
|
||||
where
|
||||
T: PageViewsQuery + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
|
||||
{
|
||||
let views = query_page_view_rows(pool, "", &args).await?;
|
||||
let mut paths = query_page_views_paths::<T>(pool, &args).await?;
|
||||
let site = if let Some(index) = paths.iter().position(|count| count.path.is_empty()) {
|
||||
paths.swap_remove(index)
|
||||
} else {
|
||||
PageViewsPathCount::default()
|
||||
};
|
||||
|
||||
Ok(PageViewsQueryResult { site, views, paths })
|
||||
}
|
|
@ -1,72 +0,0 @@
|
|||
use pbkdf2::{
|
||||
password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
|
||||
Pbkdf2,
|
||||
};
|
||||
use rand_core::OsRng;
|
||||
use serde::Serialize;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
|
||||
pub struct User {
|
||||
pub id: Uuid,
|
||||
pub username: String,
|
||||
#[serde(skip)]
|
||||
pub password: String,
|
||||
pub enabled: bool,
|
||||
pub reset_password: bool,
|
||||
}
|
||||
|
||||
pub async fn authenticate(
|
||||
pool: &PgPool,
|
||||
username: &str,
|
||||
password: &str,
|
||||
) -> sqlx::Result<Option<User>> {
|
||||
let user: User = if let Some(user) = sqlx::query_as("SELECT * FROM users WHERE username = $1")
|
||||
.bind(username)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
{
|
||||
user
|
||||
} else {
|
||||
log::warn!("User not found with username '{username}'");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let parsed_hash = PasswordHash::new(&user.password).expect("valid password hash");
|
||||
if let Err(err) = Pbkdf2.verify_password(password.as_bytes(), &parsed_hash) {
|
||||
log::error!(
|
||||
"Incorrect password for user '{username}' ('{}'): {err:?}",
|
||||
user.id
|
||||
);
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if !user.enabled {
|
||||
log::error!("User '{username}' ('{}') is disabled", user.id);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(user))
|
||||
}
|
||||
|
||||
pub async fn reset_password(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
new_password: String,
|
||||
) -> sqlx::Result<Option<User>> {
|
||||
let salt = SaltString::generate(&mut OsRng);
|
||||
let password = Pbkdf2
|
||||
.hash_password(new_password.as_bytes(), &salt)
|
||||
.expect("valid password hash")
|
||||
.to_string();
|
||||
|
||||
sqlx::query_as(
|
||||
"UPDATE users SET password = $1, reset_password = FALSE WHERE id = $2 RETURNING *",
|
||||
)
|
||||
.bind(password)
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
}
|
|
@ -1,331 +0,0 @@
|
|||
use serde::Serialize;
|
||||
use sqlx::PgPool;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, sqlx::FromRow)]
|
||||
pub struct PageView {
|
||||
pub id: Uuid,
|
||||
pub path: String,
|
||||
pub time: OffsetDateTime,
|
||||
pub user_agent: Option<String>,
|
||||
pub viewport_width: Option<i32>,
|
||||
pub viewport_height: Option<i32>,
|
||||
pub screen_width: Option<i32>,
|
||||
pub screen_height: Option<i32>,
|
||||
pub timezone: Option<String>,
|
||||
pub referrer: Option<String>,
|
||||
pub beacon: bool,
|
||||
pub duration: Option<f64>,
|
||||
pub scroll: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
|
||||
pub struct PageViewsDay {
|
||||
pub id: Uuid,
|
||||
pub path: String,
|
||||
pub year: i32,
|
||||
pub month: i32,
|
||||
pub day: i32,
|
||||
pub hour: i32,
|
||||
pub count: i32,
|
||||
pub total_beacon: i32,
|
||||
pub total_scroll: f64,
|
||||
pub total_duration: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
|
||||
pub struct PageViewsWeek {
|
||||
pub id: Uuid,
|
||||
pub path: String,
|
||||
pub year: i32,
|
||||
pub week: i32,
|
||||
pub dow: i32,
|
||||
pub count: i32,
|
||||
pub total_beacon: i32,
|
||||
pub total_scroll: f64,
|
||||
pub total_duration: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
|
||||
pub struct PageViewsMonth {
|
||||
pub id: Uuid,
|
||||
pub path: String,
|
||||
pub year: i32,
|
||||
pub month: i32,
|
||||
pub day: i32,
|
||||
pub count: i32,
|
||||
pub total_beacon: i32,
|
||||
pub total_scroll: f64,
|
||||
pub total_duration: f64,
|
||||
}
|
||||
|
||||
pub async fn create_page_view(pool: &PgPool, view: PageView) -> sqlx::Result<()> {
|
||||
sqlx::query(
|
||||
"INSERT INTO page_views
|
||||
(id, path, time, user_agent,
|
||||
viewport_width, viewport_height,
|
||||
screen_width, screen_height,
|
||||
timezone, referrer,
|
||||
beacon, duration, scroll)
|
||||
VALUES ($1, $2, $3, $4,
|
||||
$5, $6,
|
||||
$7, $8,
|
||||
$9, $10,
|
||||
$11, $12, $13)",
|
||||
)
|
||||
.bind(view.id)
|
||||
.bind(&view.path)
|
||||
.bind(view.time)
|
||||
.bind(view.user_agent)
|
||||
.bind(view.viewport_width)
|
||||
.bind(view.viewport_height)
|
||||
.bind(view.screen_width)
|
||||
.bind(view.screen_height)
|
||||
.bind(view.timezone)
|
||||
.bind(view.referrer)
|
||||
.bind(view.beacon)
|
||||
.bind(view.duration)
|
||||
.bind(view.scroll)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
update_count_accumulators(pool, &view.path, view.time).await?;
|
||||
update_count_accumulators(pool, "", view.time).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_count_accumulators(
|
||||
pool: &PgPool,
|
||||
path: &str,
|
||||
time: OffsetDateTime,
|
||||
) -> sqlx::Result<()> {
|
||||
sqlx::query(
|
||||
"
|
||||
INSERT INTO page_views_day
|
||||
(path, year, month, day, hour, count, total_beacon, total_scroll, total_duration)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5, 1, 0, 0, 0)
|
||||
ON CONFLICT (path, year, month, day, hour)
|
||||
DO UPDATE SET
|
||||
count = page_views_day.count + 1
|
||||
",
|
||||
)
|
||||
.bind(path)
|
||||
.bind(time.year())
|
||||
.bind(time.month() as i32)
|
||||
.bind(time.day() as i32)
|
||||
.bind(time.hour() as i32)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"
|
||||
INSERT INTO page_views_week
|
||||
(path, year, week, dow, count, total_beacon, total_scroll, total_duration)
|
||||
VALUES
|
||||
($1, $2, $3, $4, 1, 0, 0, 0)
|
||||
ON CONFLICT (path, year, week, dow)
|
||||
DO UPDATE SET
|
||||
count = page_views_week.count + 1
|
||||
",
|
||||
)
|
||||
.bind(path)
|
||||
.bind(time.year())
|
||||
.bind(time.iso_week() as i32)
|
||||
.bind(time.weekday().number_days_from_sunday() as i32)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"
|
||||
INSERT INTO page_views_month
|
||||
(path, year, month, day, count, total_beacon, total_scroll, total_duration)
|
||||
VALUES
|
||||
($1, $2, $3, $4, 1, 0, 0, 0)
|
||||
ON CONFLICT (path, year, month, day)
|
||||
DO UPDATE SET
|
||||
count = page_views_month.count + 1
|
||||
",
|
||||
)
|
||||
.bind(path)
|
||||
.bind(time.year())
|
||||
.bind(time.month() as i32)
|
||||
.bind(time.day() as i32)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct Accumulators {
|
||||
duration: f64,
|
||||
scroll: f64,
|
||||
count_delta: i32,
|
||||
duration_delta: f64,
|
||||
scroll_delta: f64,
|
||||
}
|
||||
|
||||
async fn update_beacon_accumulators(
|
||||
pool: &PgPool,
|
||||
path: &str,
|
||||
time: OffsetDateTime,
|
||||
Accumulators {
|
||||
duration,
|
||||
scroll,
|
||||
count_delta,
|
||||
duration_delta,
|
||||
scroll_delta,
|
||||
}: Accumulators,
|
||||
) -> sqlx::Result<()> {
|
||||
sqlx::query(
|
||||
"
|
||||
INSERT INTO page_views_day
|
||||
(path, year, month, day, hour, count, total_beacon, total_scroll, total_duration)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5, 1, 1, $6, $7)
|
||||
ON CONFLICT (path, year, month, day, hour)
|
||||
DO UPDATE SET
|
||||
total_beacon = page_views_day.total_beacon + $8,
|
||||
total_scroll = page_views_day.total_scroll + $9,
|
||||
total_duration = page_views_day.total_duration + $10
|
||||
",
|
||||
)
|
||||
.bind(path)
|
||||
.bind(time.year())
|
||||
.bind(time.month() as i32)
|
||||
.bind(time.day() as i32)
|
||||
.bind(time.hour() as i32)
|
||||
.bind(scroll)
|
||||
.bind(duration)
|
||||
.bind(count_delta)
|
||||
.bind(scroll_delta)
|
||||
.bind(duration_delta)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"
|
||||
INSERT INTO page_views_week
|
||||
(path, year, week, dow, count, total_beacon, total_scroll, total_duration)
|
||||
VALUES
|
||||
($1, $2, $3, $4, 1, 1, $5, $6)
|
||||
ON CONFLICT (path, year, week, dow)
|
||||
DO UPDATE SET
|
||||
total_beacon = page_views_week.total_beacon + $7,
|
||||
total_scroll = page_views_week.total_scroll + $8,
|
||||
total_duration = page_views_week.total_duration + $9
|
||||
",
|
||||
)
|
||||
.bind(path)
|
||||
.bind(time.year())
|
||||
.bind(time.iso_week() as i32)
|
||||
.bind(time.weekday().number_days_from_sunday() as i32)
|
||||
.bind(scroll)
|
||||
.bind(duration)
|
||||
.bind(count_delta)
|
||||
.bind(scroll_delta)
|
||||
< |