This vignette demonstrates how to create and configure surveillance tasks using the CS9 framework. CS9 provides a structured approach to building real-time surveillance systems with database integration and automated pipeline execution.
Creating a surveillance task in CS9 involves three main steps:
Before starting this tutorial, you need CS9 properly installed and
configured. See vignette("installation") for complete setup
instructions including database configuration and environment
variables.
You can verify your setup is working with:
# Check if CS9 environment is properly configured
cs9::check_environment_setup()For an overview of CS9 concepts and architecture, see
vignette("cs9").
First, create a surveillance system instance that will manage your tables and tasks:
# Create a surveillance system
library(cs9)
# Initialize the surveillance system
ss <- cs9::SurveillanceSystem_v9$new(
name = "example_surveillance",
implementation_version = "1.0.0"
)
# Verify the system was created
print(ss$name)
print(ss$implementation_version)Before creating tasks, you need to define the database tables that will store your data. Tables in CS9 require:
# Define a table for weather data
ss$add_table(
name_access = "anon", # Anonymous access level
name_grouping = "weather", # Data category
name_variant = "daily_data", # Specific variant
field_types = c(
# Temporal fields
"date" = "DATE",
"year" = "INTEGER",
"month" = "INTEGER",
"day" = "INTEGER",
# Geographic fields
"location_code" = "TEXT",
"location_name" = "TEXT",
# Weather measurements
"temperature_max" = "DOUBLE",
"temperature_min" = "DOUBLE",
"precipitation" = "DOUBLE",
"humidity" = "DOUBLE"
),
keys = c("date", "location_code"), # Unique identifier
indexes = list(
"idx_date" = "date",
"idx_location" = "location_code",
"idx_date_location" = c("date", "location_code")
)
)
# Define a table for processed results
ss$add_table(
name_access = "anon",
name_grouping = "weather",
name_variant = "weekly_summary",
field_types = c(
"year_week" = "TEXT",
"location_code" = "TEXT",
"avg_temp_max" = "DOUBLE",
"avg_temp_min" = "DOUBLE",
"total_precipitation" = "DOUBLE",
"data_quality_score" = "DOUBLE"
),
keys = c("year_week", "location_code")
)
# Check which tables are available
print("Available tables:")
print(names(ss$tables))CS9 tasks require two main functions:
The data selector function retrieves data needed for analysis:
# Data selector function for weather processing
weather_data_selector <- function(argset, tables) {
# In a real implementation, this would query your database
# For demonstration, we'll create sample data
sample_data <- data.table::data.table(
date = seq.Date(
from = as.Date(argset$date_from),
to = as.Date(argset$date_to),
by = "day"
),
location_code = rep(argset$location_code,
length.out = as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1),
temperature_max = rnorm(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1,
mean = 20, sd = 8),
temperature_min = rnorm(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1,
mean = 10, sd = 5),
precipitation = rgamma(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1,
shape = 2, rate = 4)
)
# Add derived fields
sample_data[, year := as.integer(format(date, "%Y"))]
sample_data[, month := as.integer(format(date, "%m"))]
sample_data[, day := as.integer(format(date, "%d"))]
sample_data[, location_name := paste("Location", location_code)]
sample_data[, humidity := runif(.N, 30, 90)]
return(list(
data = sample_data
))
}The action function performs the main analysis:
# Action function for weather processing
weather_action <- function(data, argset, tables) {
# Process the daily weather data into weekly summaries
# Add week information
data$data[, year_week := format(date, "%Y-W%U")]
# Calculate weekly aggregates
weekly_summary <- data$data[, .(
avg_temp_max = mean(temperature_max, na.rm = TRUE),
avg_temp_min = mean(temperature_min, na.rm = TRUE),
total_precipitation = sum(precipitation, na.rm = TRUE),
data_quality_score = 1.0 - sum(is.na(temperature_max) | is.na(temperature_min)) / .N
), by = .(year_week, location_code)]
# Insert results into database table
# In a real implementation, this would use:
# tables$anon_weather_weekly_summary$upsert_data(weekly_summary)
# For demonstration, just print results
cat("Processed weekly weather summary:\n")
print(weekly_summary)
# Log successful completion
cs9::update_config_log(
ss = argset$surveillance_system,
task = "weather_process_weekly",
paste("Successfully processed", nrow(weekly_summary), "weekly records for", argset$location_code)
)
}Now configure the task with its execution parameters:
# Add the weather processing task
ss$add_task(
name_grouping = "weather",
name_action = "process",
name_variant = "weekly_summary",
cores = 1, # Single core execution
permission = NULL, # No special permissions needed
# Task structure - one plan per location per month
for_each_plan = list(
location_code = c("LOC001", "LOC002", "LOC003"),
date_from = c("2024-01-01", "2024-02-01", "2024-03-01"),
date_to = c("2024-01-31", "2024-02-28", "2024-03-31")
),
# No analysis-level iteration needed
for_each_analysis = NULL,
# Common arguments for all plans
universal_argset = list(
surveillance_system = "example_surveillance",
data_format = "cs9_standard"
),
# Automatically insert results at end of each plan
upsert_at_end_of_each_plan = TRUE,
insert_at_end_of_each_plan = FALSE,
# Function names
action_fn_name = "weather_action",
data_selector_fn_name = "weather_data_selector",
# Table mapping
tables = list(
weather_daily = "anon_weather_daily_data",
weather_weekly = "anon_weather_weekly_summary"
)
)
# Verify the task was added
print("Available tasks:")
print(names(ss$tasks))Once configured, you can run the surveillance task:
# Get task information
task_name <- "weather_process_weekly_summary"
# View task plans and analyses
if(task_name %in% names(ss$tasks)) {
plans_info <- ss$shortcut_get_plans_argsets_as_dt(task_name)
cat("Task execution plans:\n")
print(plans_info)
# Run the task
cat("\nExecuting surveillance task...\n")
ss$run_task(task_name)
# Check task completion status
task_stats <- cs9::get_config_tasks_stats(task = task_name, last_run = TRUE)
if(nrow(task_stats) > 0) {
cat("\nTask execution completed successfully!\n")
print(task_stats[, .(task, datetime, status)])
}
}CS9 provides several utilities for monitoring task execution:
# Get all available tables
cat("All surveillance system tables:\n")
print(names(ss$tables))
# Get task execution logs
recent_logs <- cs9::get_config_log(
task = "weather_process_weekly",
start_date = Sys.Date() - 7
)
if(nrow(recent_logs) > 0) {
cat("\nRecent task logs:\n")
print(recent_logs[, .(datetime, task, message)])
}
# Get task performance statistics
task_performance <- cs9::get_config_tasks_stats(last_run = TRUE)
if(nrow(task_performance) > 0) {
cat("\nTask performance summary:\n")
print(task_performance[, .(task, datetime, runtime_seconds, status)])
}
# Access data for a specific plan (for debugging)
if(task_name %in% names(ss$tasks)) {
# Get data for first plan
debug_data <- ss$shortcut_get_data(task_name, index_plan = 1)
cat("\nData structure for plan 1:\n")
if(!is.null(debug_data$data)) {
print(str(debug_data$data))
}
# Get arguments for first plan, first analysis
debug_args <- ss$shortcut_get_argset(task_name, index_plan = 1, index_analysis = 1)
cat("\nArgument set for plan 1, analysis 1:\n")
print(debug_args)
}For computationally intensive tasks, you can enable parallel processing:
# Configure a task for parallel execution
ss$add_task(
name_grouping = "weather",
name_action = "analyze",
name_variant = "trends",
cores = 4, # Use 4 CPU cores
# Large number of plans that can benefit from parallelization
for_each_plan = list(
location_code = sprintf("LOC%03d", 1:100), # 100 locations
analysis_year = rep(2020:2024, length.out = 100) # 5 years
),
universal_argset = list(
min_data_quality = 0.8,
trend_method = "linear_regression"
),
action_fn_name = "weather_trend_action",
data_selector_fn_name = "weather_trend_data_selector",
tables = list(
input_data = "anon_weather_daily_data",
trend_results = "anon_weather_trends"
)
)For complex scenarios, you can use a function to generate plans dynamically:
# Function to generate plans based on database state
generate_weather_plans <- function() {
# In real implementation, query database for available data
available_locations <- c("LOC001", "LOC002", "LOC003")
available_years <- 2020:2024
# Generate plans for locations with sufficient data
plans <- list()
for(location in available_locations) {
for(year in available_years) {
plans <- append(plans, list(list(
location_code = location,
year = year,
min_date = paste0(year, "-01-01"),
max_date = paste0(year, "-12-31")
)))
}
}
return(list(
for_each_plan = plans,
for_each_analysis = NULL
))
}
# Use the custom plan generator
ss$add_task(
name_grouping = "weather",
name_action = "validate",
name_variant = "data_quality",
cores = 2,
# Use function to generate plans
plan_analysis_fn_name = "generate_weather_plans",
for_each_plan = NULL, # Will be generated by function
for_each_analysis = NULL,
universal_argset = list(
quality_threshold = 0.85,
validation_rules = c("completeness", "consistency", "accuracy")
),
action_fn_name = "weather_validation_action",
data_selector_fn_name = "weather_validation_data_selector",
tables = list(
source_data = "anon_weather_daily_data",
validation_results = "anon_weather_validation"
)
)Always include robust error handling in your task functions:
robust_weather_action <- function(data, argset, tables) {
tryCatch({
# Validate input data
if(is.null(data$data) || nrow(data$data) == 0) {
warning("No data available for processing")
return(invisible(NULL))
}
# Check required columns
required_cols <- c("date", "location_code", "temperature_max")
missing_cols <- setdiff(required_cols, names(data$data))
if(length(missing_cols) > 0) {
stop("Missing required columns: ", paste(missing_cols, collapse = ", "))
}
# Perform analysis with validation
result <- data$data[, .(
avg_temp = mean(temperature_max, na.rm = TRUE),
record_count = .N
), by = location_code]
# Validate results before storing
if(any(is.na(result$avg_temp))) {
warning("Some temperature averages could not be calculated")
}
# Log successful completion
cs9::update_config_log(
ss = argset$surveillance_system,
task = "weather_analysis",
paste("Successfully processed", nrow(result), "location summaries")
)
}, error = function(e) {
# Log errors for debugging
cs9::update_config_log(
ss = argset$surveillance_system,
task = "weather_analysis",
paste("ERROR:", e$message)
)
stop(e)
})
}Implement comprehensive data validation:
validate_weather_data <- function(data) {
validation_results <- list()
# Check data completeness
completeness <- data[, lapply(.SD, function(x) sum(!is.na(x))/.N)]
validation_results$completeness <- completeness
# Check data ranges
temp_range_check <- data[temperature_max < -50 | temperature_max > 60, .N]
validation_results$temperature_outliers <- temp_range_check
# Check temporal consistency
date_gaps <- data[order(date), .(
max_gap = max(as.numeric(diff(date)), na.rm = TRUE)
), by = location_code]
validation_results$temporal_gaps <- date_gaps
return(validation_results)
}Structure your task functions for reusability:
# Utility function for common data transformations
standardize_weather_data <- function(data) {
# Apply common transformations
data[, temperature_celsius := round(temperature_max, 1)]
data[, date_formatted := format(date, "%Y-%m-%d")]
data[, is_weekend := weekdays(date) %in% c("Saturday", "Sunday")]
return(data)
}
# Reusable data selector template
create_weather_data_selector <- function(date_column = "date",
location_column = "location_code") {
function(argset, tables) {
# Common data selection logic
query_data <- tables[[argset$source_table]]$tbl() %>%
dplyr::filter(
!!rlang::sym(date_column) >= argset$date_from,
!!rlang::sym(date_column) <= argset$date_to,
!!rlang::sym(location_column) %in% argset$location_codes
) %>%
dplyr::collect() %>%
data.table::as.data.table()
# Apply standardization
standardized_data <- standardize_weather_data(query_data)
return(list(data = standardized_data))
}
}This vignette covered the complete workflow for creating surveillance tasks in CS9:
The CS9 framework provides a robust foundation for building scalable surveillance systems with automated pipeline execution, comprehensive logging, and flexible task orchestration.
For more information about CS9:
vignette("cs9") - Package overview and architecture
conceptsvignette("installation") - Complete installation and
setup guidevignette("file-layout") - Package structure and file
organization patterns?SurveillanceSystem_v9 - Main surveillance system class
documentation?check_environment_setup - Environment configuration
diagnostics