Skip to content

Commit

Permalink
Hierarchical continuous aggregates (@rollup) (#19)
Browse files Browse the repository at this point in the history
* feat: init rollup

* *

* feat: more rollup stuff

* test: fix *

* *

* fix: *

* change rollup column name

* docs: *

* test: add timeout

* feat: add coverage for bucket column enforcing

* docs: *

* feat: add rollup example to sequelize

* feat: add checks for nested rollups

* docs: *
  • Loading branch information
danstarns authored Feb 10, 2025
1 parent f8417e6 commit d9c4f5d
Show file tree
Hide file tree
Showing 28 changed files with 1,527 additions and 41 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ jobs:
- name: Run Core Package Tests
run: pnpm run --filter @timescaledb/core test

- name: Run TypeORM Lib Tests
run: pnpm run --filter @timescaledb/typeorm test

- name: Run Sequelize Migration
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/sequelize
Expand All @@ -67,12 +70,12 @@ jobs:
DATABASE_URL: postgres://postgres:password@localhost:5432/typeorm
run: pnpm run --filter @timescaledb/example-node-typeorm migrate

- name: Run Sequelize Tests
- name: Run Sequelize Example Tests
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/sequelize
run: pnpm run --filter @timescaledb/example-node-sequelize test

- name: Run TypeORM Tests
- name: Run TypeORM Example Tests
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/typeorm
run: pnpm run --filter @timescaledb/example-node-typeorm test
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@ If you are looking to setup this project locally, you can follow the instruction

## Feature Compatibility

| Feature | Core | TypeORM | Sequelize |
| ------------------------------------------------------------------------------------------------------------------------------------ | ---- | ------- | -------------------- |
| **Core Functions** | | | |
| [Create Hypertable](https://docs.timescale.com/api/latest/hypertable/create_hypertable/) || ✅ Auto | ✅ Manual (via Core) |
| [Add Compression](https://docs.timescale.com/api/latest/compression/alter_table_compression/) || ✅ Auto | ✅ Manual (via Core) |
| [Add Compression Policy](https://docs.timescale.com/api/latest/compression/add_compression_policy/) || ✅ Auto | ✅ Manual (via Core) |
| [Add Retention Policy](https://docs.timescale.com/use-timescale/latest/data-retention/create-a-retention-policy/) ||||
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) || ✅ Auto | ✅ Manual (via Core) |
| **Hyperfunctions** | | | |
| [Time Bucket](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/) ||| ✅ Manual (via Core) |
| [Candlestick Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/financial-analysis/candlestick_agg/) ||| ✅ Manual (via Core) |
| [Stats Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/statistical-and-regression-analysis/stats_agg-one-variable/) ||||
| [Percentile Approximation](https://docs.timescale.com/api/latest/hyperfunctions/percentile-approximation/uddsketch/) ||||
| **Info Views** | | | |
| [Chunks](https://docs.timescale.com/api/latest/hypertable/show_chunks/) ||||
| [User Defined Actions](https://docs.timescale.com/api/latest/actions/) ||||
| [Compression Settings](https://docs.timescale.com/api/latest/compression/) ||| ✅ Manual (via Core) |
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) ||| ✅ Manual (via Core) |
| Feature | Core | TypeORM | Sequelize |
| ----------------------------------------------------------------------------------------------------------------------------------------------- | ---- | ------- | -------------------- |
| **Core Functions** | | | |
| [Create Hypertable](https://docs.timescale.com/api/latest/hypertable/create_hypertable/) || ✅ Auto | ✅ Manual (via Core) |
| [Add Compression](https://docs.timescale.com/api/latest/compression/alter_table_compression/) || ✅ Auto | ✅ Manual (via Core) |
| [Add Compression Policy](https://docs.timescale.com/api/latest/compression/add_compression_policy/) || ✅ Auto | ✅ Manual (via Core) |
| [Add Retention Policy](https://docs.timescale.com/use-timescale/latest/data-retention/create-a-retention-policy/) ||||
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) || ✅ Auto | ✅ Manual (via Core) |
| **Hyperfunctions** | | | |
| [Time Bucket](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/) ||| ✅ Manual (via Core) |
| [Candlestick Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/financial-analysis/candlestick_agg/) ||| ✅ Manual (via Core) |
| [Stats Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/statistical-and-regression-analysis/stats_agg-one-variable/) ||||
| [Percentile Approximation](https://docs.timescale.com/api/latest/hyperfunctions/percentile-approximation/uddsketch/) ||||
| **Info Views** | | | |
| [Chunks](https://docs.timescale.com/api/latest/hypertable/show_chunks/) ||||
| [User Defined Actions](https://docs.timescale.com/api/latest/actions/) ||||
| [Compression Settings](https://docs.timescale.com/api/latest/compression/) ||| ✅ Manual (via Core) |
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) ||| ✅ Manual (via Core) |
| [Hierarchical continuous aggregates](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/hierarchical-continuous-aggregates/) ||| ✅ Manual (via Core) |

Legend:

Expand Down
38 changes: 38 additions & 0 deletions examples/node-sequelize/config/DailyPageStats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { TimescaleDB } from '@timescaledb/core';
import { AggregateType, RollupFunctionType } from '@timescaledb/schemas';

export const DailyPageStats = TimescaleDB.createRollup({
continuousAggregateOptions: {
name: 'daily_page_stats',
bucket_interval: '1 day',
refresh_policy: {
start_offset: '30 days',
end_offset: '1 day',
schedule_interval: '1 day',
},
},
rollupOptions: {
sourceView: 'hourly_page_views',
name: 'daily_page_stats',
bucketInterval: '1 day',
materializedOnly: false,
bucketColumn: {
source: 'bucket',
target: 'bucket',
},
rollupRules: [
{
rollupFn: RollupFunctionType.Rollup,
sourceColumn: 'total_views',
targetColumn: 'sum_total_views',
aggregateType: AggregateType.Sum,
},
{
rollupFn: RollupFunctionType.Rollup,
sourceColumn: 'unique_users',
targetColumn: 'avg_unique_users',
aggregateType: AggregateType.Avg,
},
],
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const path = require('path');
const { DailyPageStats } = require(path.join(__dirname, '../dist/config/DailyPageStats'));

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface) {
const sql = DailyPageStats.up().build();
await queryInterface.sequelize.query(sql);

const refreshPolicy = DailyPageStats.up().getRefreshPolicy();
if (refreshPolicy) {
await queryInterface.sequelize.query(refreshPolicy);
}
},

async down(queryInterface) {
const statements = DailyPageStats.down().build();
for await (const sql of statements) {
await queryInterface.sequelize.query(sql);
}
},
};
33 changes: 33 additions & 0 deletions examples/node-sequelize/src/models/DailyPageStats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Model, DataTypes } from 'sequelize';
import sequelize from '../database';

class DailyPageStats extends Model {
public bucket!: Date;
public sumTotalViews!: number;
public avgUniqueUsers!: number;
}

DailyPageStats.init(
{
bucket: {
type: DataTypes.DATE,
primaryKey: true,
},
sumTotalViews: {
type: DataTypes.INTEGER,
field: 'sum_total_views',
},
avgUniqueUsers: {
type: DataTypes.FLOAT,
field: 'avg_unique_users',
},
},
{
sequelize,
tableName: 'daily_page_stats',
timestamps: false,
underscored: true,
},
);

export default DailyPageStats;
22 changes: 22 additions & 0 deletions examples/node-sequelize/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Router } from 'express';
import PageLoad from '../models/PageLoad';
import { getPageViewStats, getCompressionStats, getCandlestickData } from '../services/timescale';
import HourlyPageView from '../models/HourlyPageView';
import DailyPageStats from '../models/DailyPageStats';
import { Op } from 'sequelize';
import { WhereClauseSchema } from '@timescaledb/schemas';

Expand Down Expand Up @@ -90,4 +91,25 @@ router.get('/candlestick', async (req, res) => {
}
});

router.get('/daily', async (req, res) => {
try {
const start = new Date(req.query.start as string);
const end = new Date(req.query.end as string);

const dailyStats = await DailyPageStats.findAll({
where: {
bucket: {
[Op.between]: [start, end],
},
},
order: [['bucket', 'DESC']],
});

res.json(dailyStats);
} catch (error) {
console.error(error);
res.status(500).json({ error: 'Failed to get daily stats' });
}
});

export default router;
76 changes: 76 additions & 0 deletions examples/node-sequelize/tests/daily.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { describe, it, expect, beforeEach, afterAll } from '@jest/globals';
import { request } from './mock-request';
import sequelize from '../src/database';
import PageLoad from '../src/models/PageLoad';
import { faker } from '@faker-js/faker';

describe('GET /api/daily', () => {
beforeEach(async () => {
await PageLoad.destroy({ where: {} });
});

afterAll(async () => {
await sequelize.close();
});

it('should return daily stats for a given time range', async () => {
const baseTime = new Date();
baseTime.setHours(0, 0, 0, 0); // Start of day

// Create test data across 3 days
for (let day = 0; day < 3; day++) {
const dayStart = new Date(baseTime.getTime() - day * 24 * 3600000);

// Create multiple records per day across different hours
for (let hour = 0; hour < 24; hour += 4) {
const time = new Date(dayStart.getTime() + hour * 3600000);

// Create multiple records per hour
for (let i = 0; i < 5; i++) {
await PageLoad.create({
userAgent: faker.internet.userAgent(),
time: new Date(time.getTime() + i * 60000), // Spread over minutes
});
}
}
}

// Manually refresh the continuous aggregate for hourly views
await sequelize.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`);

// Wait for hourly refresh to complete
await new Promise((resolve) => setTimeout(resolve, 2000));

// Manually refresh the rollup for daily stats
await sequelize.query(`CALL refresh_continuous_aggregate('daily_page_stats', null, null);`);

// Wait for daily refresh to complete
await new Promise((resolve) => setTimeout(resolve, 2000));

const start = new Date(baseTime.getTime() - 4 * 24 * 3600000); // 4 days ago
const end = baseTime;

const response = await request().get('/api/daily').query({
start: start.toISOString(),
end: end.toISOString(),
});

expect(response.status).toBe(200);
expect(response.body).toHaveLength(3);

const firstDay = response.body[0];
expect(firstDay).toHaveProperty('bucket');
expect(firstDay).toHaveProperty('sumTotalViews');
expect(firstDay).toHaveProperty('avgUniqueUsers');

// Each day should have:
// - 6 time slots (every 4 hours)
// - 5 views per time slot
// - 30 total views per day
response.body.forEach((day: any) => {
expect(day.sumTotalViews).toBe('30');
expect(Number(day.avgUniqueUsers)).toBeGreaterThan(0);
expect(Number(day.avgUniqueUsers)).toBeLessThanOrEqual(30);
});
});
});
3 changes: 2 additions & 1 deletion examples/node-typeorm/src/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PageLoad } from './models/PageLoad';
import dotenv from 'dotenv';
import { HourlyPageViews } from './models/HourlyPageViews';
import { StockPrice } from './models/StockPrice';
import { DailyPageStats } from './models/DailyPageStats';

dotenv.config();

Expand All @@ -13,6 +14,6 @@ export const AppDataSource = new DataSource({
url: process.env.DATABASE_URL,
synchronize: false,
logging: process.env.NODE_ENV === 'development',
entities: [PageLoad, HourlyPageViews, StockPrice],
entities: [PageLoad, HourlyPageViews, StockPrice, DailyPageStats],
migrations: ['migrations/*.ts'],
});
31 changes: 31 additions & 0 deletions examples/node-typeorm/src/models/DailyPageStats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Rollup, BucketColumn, RollupColumn } from '@timescaledb/typeorm';
import { HourlyPageViews } from './HourlyPageViews';
import { AggregateType } from '@timescaledb/schemas';

@Rollup(HourlyPageViews, {
name: 'daily_page_stats',
bucket_interval: '1 day',
refresh_policy: {
start_offset: '30 days',
end_offset: '1 day',
schedule_interval: '1 day',
},
})
export class DailyPageStats {
@BucketColumn({
source_column: 'bucket',
})
bucket!: Date;

@RollupColumn({
type: AggregateType.Sum,
source_column: 'total_views',
})
sum_total_views!: number;

@RollupColumn({
type: AggregateType.Avg,
source_column: 'unique_users',
})
avg_unique_users!: number;
}
21 changes: 21 additions & 0 deletions examples/node-typeorm/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PageLoad } from '../models/PageLoad';
import { HourlyPageViews } from '../models/HourlyPageViews';
import { StockPrice } from '../models/StockPrice';
import { WhereClauseSchema } from '@timescaledb/schemas';
import { DailyPageStats } from '../models/DailyPageStats';

const router = Router();

Expand Down Expand Up @@ -84,6 +85,26 @@ router.get('/hourly', async (req, res) => {
}
});

router.get('/daily', async (req, res) => {
try {
const start = new Date(req.query.start as string);
const end = new Date(req.query.end as string);

const query = AppDataSource.getRepository(DailyPageStats)
.createQueryBuilder()
.where('bucket >= :start', { start })
.andWhere('bucket <= :end', { end })
.orderBy('bucket', 'DESC');

const dailyStats = await query.getMany();

res.json(dailyStats);
} catch (error) {
console.error(error);
res.status(500).json({ error: 'Failed to get daily stats' });
}
});

router.get('/candlestick', async (req, res) => {
try {
const start = new Date(req.query.start as string);
Expand Down
Loading

0 comments on commit d9c4f5d

Please sign in to comment.