Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hierarchical continuous aggregates (@Rollup) #19

Merged
merged 14 commits into from
Feb 10, 2025
Merged
7 changes: 5 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ jobs:
- name: Run Core Package Tests
run: pnpm run --filter @timescaledb/core test

- name: Run TypeORM Lib Tests
run: pnpm run --filter @timescaledb/typeorm test

- name: Run Sequelize Migration
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/sequelize
Expand All @@ -67,12 +70,12 @@ jobs:
DATABASE_URL: postgres://postgres:password@localhost:5432/typeorm
run: pnpm run --filter @timescaledb/example-node-typeorm migrate

- name: Run Sequelize Tests
- name: Run Sequelize Example Tests
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/sequelize
run: pnpm run --filter @timescaledb/example-node-sequelize test

- name: Run TypeORM Tests
- name: Run TypeORM Example Tests
env:
DATABASE_URL: postgres://postgres:password@localhost:5432/typeorm
run: pnpm run --filter @timescaledb/example-node-typeorm test
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@ If you are looking to setup this project locally, you can follow the instruction

## Feature Compatibility

| Feature | Core | TypeORM | Sequelize |
| ------------------------------------------------------------------------------------------------------------------------------------ | ---- | ------- | -------------------- |
| **Core Functions** | | | |
| [Create Hypertable](https://docs.timescale.com/api/latest/hypertable/create_hypertable/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| [Add Compression](https://docs.timescale.com/api/latest/compression/alter_table_compression/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| [Add Compression Policy](https://docs.timescale.com/api/latest/compression/add_compression_policy/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| [Add Retention Policy](https://docs.timescale.com/use-timescale/latest/data-retention/create-a-retention-policy/) | ❌ | ❌ | ❌ |
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| **Hyperfunctions** | | | |
| [Time Bucket](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Candlestick Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/financial-analysis/candlestick_agg/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Stats Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/statistical-and-regression-analysis/stats_agg-one-variable/) | ❌ | ❌ | ❌ |
| [Percentile Approximation](https://docs.timescale.com/api/latest/hyperfunctions/percentile-approximation/uddsketch/) | ❌ | ❌ | ❌ |
| **Info Views** | | | |
| [Chunks](https://docs.timescale.com/api/latest/hypertable/show_chunks/) | ❌ | ❌ | ❌ |
| [User Defined Actions](https://docs.timescale.com/api/latest/actions/) | ❌ | ❌ | ❌ |
| [Compression Settings](https://docs.timescale.com/api/latest/compression/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) | ✅ | ✅ | ✅ Manual (via Core) |
| Feature | Core | TypeORM | Sequelize |
| ----------------------------------------------------------------------------------------------------------------------------------------------- | ---- | ------- | -------------------- |
| **Core Functions** | | | |
| [Create Hypertable](https://docs.timescale.com/api/latest/hypertable/create_hypertable/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| [Add Compression](https://docs.timescale.com/api/latest/compression/alter_table_compression/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| [Add Compression Policy](https://docs.timescale.com/api/latest/compression/add_compression_policy/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| [Add Retention Policy](https://docs.timescale.com/use-timescale/latest/data-retention/create-a-retention-policy/) | ❌ | ❌ | ❌ |
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) | ✅ | ✅ Auto | ✅ Manual (via Core) |
| **Hyperfunctions** | | | |
| [Time Bucket](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Candlestick Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/financial-analysis/candlestick_agg/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Stats Aggregates](https://docs.timescale.com/api/latest/hyperfunctions/statistical-and-regression-analysis/stats_agg-one-variable/) | ❌ | ❌ | ❌ |
| [Percentile Approximation](https://docs.timescale.com/api/latest/hyperfunctions/percentile-approximation/uddsketch/) | ❌ | ❌ | ❌ |
| **Info Views** | | | |
| [Chunks](https://docs.timescale.com/api/latest/hypertable/show_chunks/) | ❌ | ❌ | ❌ |
| [User Defined Actions](https://docs.timescale.com/api/latest/actions/) | ❌ | ❌ | ❌ |
| [Compression Settings](https://docs.timescale.com/api/latest/compression/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Continuous Aggregates](https://docs.timescale.com/api/latest/continuous-aggregates/create_materialized_view/) | ✅ | ✅ | ✅ Manual (via Core) |
| [Hierarchical continuous aggregates](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/hierarchical-continuous-aggregates/) | ✅ | ✅ | ✅ Manual (via Core) |

Legend:

Expand Down
38 changes: 38 additions & 0 deletions examples/node-sequelize/config/DailyPageStats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { TimescaleDB } from '@timescaledb/core';
import { AggregateType, RollupFunctionType } from '@timescaledb/schemas';

export const DailyPageStats = TimescaleDB.createRollup({
continuousAggregateOptions: {
name: 'daily_page_stats',
bucket_interval: '1 day',
refresh_policy: {
start_offset: '30 days',
end_offset: '1 day',
schedule_interval: '1 day',
},
},
rollupOptions: {
sourceView: 'hourly_page_views',
name: 'daily_page_stats',
bucketInterval: '1 day',
materializedOnly: false,
bucketColumn: {
source: 'bucket',
target: 'bucket',
},
rollupRules: [
{
rollupFn: RollupFunctionType.Rollup,
sourceColumn: 'total_views',
targetColumn: 'sum_total_views',
aggregateType: AggregateType.Sum,
},
{
rollupFn: RollupFunctionType.Rollup,
sourceColumn: 'unique_users',
targetColumn: 'avg_unique_users',
aggregateType: AggregateType.Avg,
},
],
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const path = require('path');
const { DailyPageStats } = require(path.join(__dirname, '../dist/config/DailyPageStats'));

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface) {
const sql = DailyPageStats.up().build();
await queryInterface.sequelize.query(sql);

const refreshPolicy = DailyPageStats.up().getRefreshPolicy();
if (refreshPolicy) {
await queryInterface.sequelize.query(refreshPolicy);
}
},

async down(queryInterface) {
const statements = DailyPageStats.down().build();
for await (const sql of statements) {
await queryInterface.sequelize.query(sql);
}
},
};
33 changes: 33 additions & 0 deletions examples/node-sequelize/src/models/DailyPageStats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Model, DataTypes } from 'sequelize';
import sequelize from '../database';

class DailyPageStats extends Model {
public bucket!: Date;
public sumTotalViews!: number;
public avgUniqueUsers!: number;
}

DailyPageStats.init(
{
bucket: {
type: DataTypes.DATE,
primaryKey: true,
},
sumTotalViews: {
type: DataTypes.INTEGER,
field: 'sum_total_views',
},
avgUniqueUsers: {
type: DataTypes.FLOAT,
field: 'avg_unique_users',
},
},
{
sequelize,
tableName: 'daily_page_stats',
timestamps: false,
underscored: true,
},
);

export default DailyPageStats;
22 changes: 22 additions & 0 deletions examples/node-sequelize/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Router } from 'express';
import PageLoad from '../models/PageLoad';
import { getPageViewStats, getCompressionStats, getCandlestickData } from '../services/timescale';
import HourlyPageView from '../models/HourlyPageView';
import DailyPageStats from '../models/DailyPageStats';
import { Op } from 'sequelize';
import { WhereClauseSchema } from '@timescaledb/schemas';

Expand Down Expand Up @@ -90,4 +91,25 @@ router.get('/candlestick', async (req, res) => {
}
});

router.get('/daily', async (req, res) => {
try {
const start = new Date(req.query.start as string);
const end = new Date(req.query.end as string);

const dailyStats = await DailyPageStats.findAll({
where: {
bucket: {
[Op.between]: [start, end],
},
},
order: [['bucket', 'DESC']],
});

res.json(dailyStats);
} catch (error) {
console.error(error);
res.status(500).json({ error: 'Failed to get daily stats' });
}
});

export default router;
76 changes: 76 additions & 0 deletions examples/node-sequelize/tests/daily.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { describe, it, expect, beforeEach, afterAll } from '@jest/globals';
import { request } from './mock-request';
import sequelize from '../src/database';
import PageLoad from '../src/models/PageLoad';
import { faker } from '@faker-js/faker';

describe('GET /api/daily', () => {
beforeEach(async () => {
await PageLoad.destroy({ where: {} });
});

afterAll(async () => {
await sequelize.close();
});

it('should return daily stats for a given time range', async () => {
const baseTime = new Date();
baseTime.setHours(0, 0, 0, 0); // Start of day

// Create test data across 3 days
for (let day = 0; day < 3; day++) {
const dayStart = new Date(baseTime.getTime() - day * 24 * 3600000);

// Create multiple records per day across different hours
for (let hour = 0; hour < 24; hour += 4) {
const time = new Date(dayStart.getTime() + hour * 3600000);

// Create multiple records per hour
for (let i = 0; i < 5; i++) {
await PageLoad.create({
userAgent: faker.internet.userAgent(),
time: new Date(time.getTime() + i * 60000), // Spread over minutes
});
}
}
}

// Manually refresh the continuous aggregate for hourly views
await sequelize.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`);

// Wait for hourly refresh to complete
await new Promise((resolve) => setTimeout(resolve, 2000));

// Manually refresh the rollup for daily stats
await sequelize.query(`CALL refresh_continuous_aggregate('daily_page_stats', null, null);`);

// Wait for daily refresh to complete
await new Promise((resolve) => setTimeout(resolve, 2000));

const start = new Date(baseTime.getTime() - 4 * 24 * 3600000); // 4 days ago
const end = baseTime;

const response = await request().get('/api/daily').query({
start: start.toISOString(),
end: end.toISOString(),
});

expect(response.status).toBe(200);
expect(response.body).toHaveLength(3);

const firstDay = response.body[0];
expect(firstDay).toHaveProperty('bucket');
expect(firstDay).toHaveProperty('sumTotalViews');
expect(firstDay).toHaveProperty('avgUniqueUsers');

// Each day should have:
// - 6 time slots (every 4 hours)
// - 5 views per time slot
// - 30 total views per day
response.body.forEach((day: any) => {
expect(day.sumTotalViews).toBe('30');
expect(Number(day.avgUniqueUsers)).toBeGreaterThan(0);
expect(Number(day.avgUniqueUsers)).toBeLessThanOrEqual(30);
});
});
});
3 changes: 2 additions & 1 deletion examples/node-typeorm/src/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PageLoad } from './models/PageLoad';
import dotenv from 'dotenv';
import { HourlyPageViews } from './models/HourlyPageViews';
import { StockPrice } from './models/StockPrice';
import { DailyPageStats } from './models/DailyPageStats';

dotenv.config();

Expand All @@ -13,6 +14,6 @@ export const AppDataSource = new DataSource({
url: process.env.DATABASE_URL,
synchronize: false,
logging: process.env.NODE_ENV === 'development',
entities: [PageLoad, HourlyPageViews, StockPrice],
entities: [PageLoad, HourlyPageViews, StockPrice, DailyPageStats],
migrations: ['migrations/*.ts'],
});
31 changes: 31 additions & 0 deletions examples/node-typeorm/src/models/DailyPageStats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Rollup, BucketColumn, RollupColumn } from '@timescaledb/typeorm';
import { HourlyPageViews } from './HourlyPageViews';
import { AggregateType } from '@timescaledb/schemas';

@Rollup(HourlyPageViews, {
name: 'daily_page_stats',
bucket_interval: '1 day',
refresh_policy: {
start_offset: '30 days',
end_offset: '1 day',
schedule_interval: '1 day',
},
})
export class DailyPageStats {
@BucketColumn({
source_column: 'bucket',
})
bucket!: Date;

@RollupColumn({
type: AggregateType.Sum,
source_column: 'total_views',
})
sum_total_views!: number;

@RollupColumn({
type: AggregateType.Avg,
source_column: 'unique_users',
})
avg_unique_users!: number;
}
21 changes: 21 additions & 0 deletions examples/node-typeorm/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PageLoad } from '../models/PageLoad';
import { HourlyPageViews } from '../models/HourlyPageViews';
import { StockPrice } from '../models/StockPrice';
import { WhereClauseSchema } from '@timescaledb/schemas';
import { DailyPageStats } from '../models/DailyPageStats';

const router = Router();

Expand Down Expand Up @@ -84,6 +85,26 @@ router.get('/hourly', async (req, res) => {
}
});

router.get('/daily', async (req, res) => {
try {
const start = new Date(req.query.start as string);
const end = new Date(req.query.end as string);

const query = AppDataSource.getRepository(DailyPageStats)
.createQueryBuilder()
.where('bucket >= :start', { start })
.andWhere('bucket <= :end', { end })
.orderBy('bucket', 'DESC');

const dailyStats = await query.getMany();

res.json(dailyStats);
} catch (error) {
console.error(error);
res.status(500).json({ error: 'Failed to get daily stats' });
}
});

router.get('/candlestick', async (req, res) => {
try {
const start = new Date(req.query.start as string);
Expand Down
Loading
Loading