Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/linter.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: Linter

on:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tf_apply:
terraform -chdir=infra/tf init && terraform -chdir=infra/tf apply -auto-approve

bigquery_export_deploy:
cd infra/bigquery-export && npm install && npm run buildpack
cd infra/bigquery-export && npm run build

#bigquery_export_spark_deploy:
# cd infra/bigquery_export_spark && gcloud builds submit --region=global --tag us-docker.pkg.dev/httparchive/bigquery-spark-procedures/firestore_export:latest
31 changes: 31 additions & 0 deletions definitions/output/reports/tech_report_geos.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const pastMonth = constants.fnPastMonth(constants.currentMonth)

publish('tech_report_geos', {
schema: 'reports',
type: 'table',
tags: ['tech_report']
}).query(ctx => `
SELECT
geo,
adoption.mobile AS mobile_origins
FROM ${ctx.ref('reports', 'tech_report_adoption')}
WHERE
date = '${pastMonth}'
AND rank = 'ALL'
AND technology = 'ALL'
AND version = 'ALL'
${constants.devRankFilter}
`).postOps(ctx => `
SELECT
reports.run_export_job(
JSON '''{
"destination": "firestore",
"config": {
"database": "tech-report-api-${constants.environment}",
"collection": "geos",
"type": "dict"
},
"query": "SELECT * FROM ${ctx.self()}"
}'''
);
`)
31 changes: 31 additions & 0 deletions definitions/output/reports/tech_report_ranks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const pastMonth = constants.fnPastMonth(constants.currentMonth)

publish('tech_report_ranks', {
schema: 'reports',
type: 'table',
tags: ['tech_report']
}).query(ctx => `
SELECT
rank,
adoption.mobile AS mobile_origins
FROM ${ctx.ref('reports', 'tech_report_adoption')}
WHERE
date = '${pastMonth}'
AND geo = 'ALL'
AND technology = 'ALL'
AND version = 'ALL'
${constants.devRankFilter}
`).postOps(ctx => `
SELECT
reports.run_export_job(
JSON '''{
"destination": "firestore",
"config": {
"database": "tech-report-api-${constants.environment}",
"collection": "ranks",
"type": "dict"
},
"query": "SELECT * FROM ${ctx.self()}"
}'''
);
`)
9 changes: 9 additions & 0 deletions infra/bigquery-export/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
node_modules
npm-debug.log
.git
.gitignore
.env
.nyc_output
coverage
*.md
.DS_Store
10 changes: 6 additions & 4 deletions infra/bigquery-export/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ FROM node:current-slim

WORKDIR /usr/src/app

COPY . .
# Copy package files first for better caching
COPY package*.json ./

# Clean up the node_modules directory
RUN rm -rf node_modules
# Install dependencies
RUN npm ci --only=production --quiet --no-fund --no-audit

RUN npm ci --only=production
# Copy source code
COPY . .

CMD ["node", "index.js"]
7 changes: 7 additions & 0 deletions infra/bigquery-export/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
steps:
- name: "gcr.io/cloud-builders/docker"
args:
["build", "-t", "us.gcr.io/httparchive/cloud-run/bigquery-export", "."]
images:
- "us.gcr.io/httparchive/cloud-run/bigquery-export"
218 changes: 134 additions & 84 deletions infra/bigquery-export/firestore.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,135 +3,185 @@ import { BigQueryExport } from './bigquery.js'

export class FirestoreBatch {
constructor () {
this.firestore = new Firestore()
this.firestore = new Firestore({
gaxOptions: {
grpc: {
max_receive_message_length: 500 * 1024 * 1024, // 500MB
max_send_message_length: 500 * 1024 * 1024, // 500MB
'grpc.max_connection_idle_ms': 5 * 60 * 1000, // 5 minutes
'grpc.keepalive_time_ms': 30 * 1000, // 30 seconds
'grpc.keepalive_timeout_ms': 60 * 1000, // 1 minute
'grpc.keepalive_permit_without_calls': true
}
}
})
this.bigquery = new BigQueryExport()
this.batchSize = 500
this.maxConcurrentBatches = 200

// Configuration constants
this.config = {
timeout: 10 * 60 * 1000, // 10 minutes
progressReportInterval: 200000, // Report progress every N operations
flushThreshold: 200000 // Flush BulkWriter every N operations
}

this.reset()
}

reset () {
this.processedDocs = 0
this.totalDocs = 0
this.bulkWriter = null
}

queueBatch (operation) {
const batch = this.firestore.batch()

this.currentBatch.forEach((doc) => {
if (operation === 'delete') {
batch.delete(doc.ref)
} else if (operation === 'set') {
const docRef = this.firestore.collection(this.collectionName).doc()
batch.set(docRef, doc)
} else {
throw new Error('Invalid operation')
createBulkWriter (operation) {
const bulkWriter = this.firestore.bulkWriter()

// Configure error handling with progress info
bulkWriter.onWriteError((error) => {
const progressInfo = this.totalDocs > 0 ? ` (${this.processedDocs}/${this.totalDocs})` : ''
console.warn(`${operation} operation failed${progressInfo}:`, error.message)

// Retry on transient errors, fail on permanent ones
const retryableErrors = ['deadline-exceeded', 'unavailable', 'resource-exhausted']
return retryableErrors.includes(error.code)
})

// Track progress on successful writes
bulkWriter.onWriteResult(() => {
this.processedDocs++

// Report progress periodically
if (this.processedDocs % this.config.progressReportInterval === 0) {
const progressInfo = this.totalDocs > 0 ? ` (${this.processedDocs}/${this.totalDocs})` : ` (${this.processedDocs} processed)`
console.log(`Progress${progressInfo} - ${operation}ing documents in ${this.collectionName}`)
}
})
this.batchPromises.push(batch)
this.currentBatch = []
}

async commitBatches () {
console.log(`Committing ${this.batchPromises.length} batches to ${this.collectionName}`)
await Promise.all(
this.batchPromises.map(async (batchPromise) => await batchPromise.commit()
.catch((error) => {
console.error('Error committing batch:', error)
throw error
})
)
)
this.batchPromises = []
return bulkWriter
}

async finalFlush (operation) {
if (this.currentBatch.length > 0) {
this.queueBatch(operation)
buildQuery (collectionRef) {
const queryMap = {
report: () => {
console.info(`Deleting documents from ${this.collectionName} for date ${this.date}`)
return collectionRef.where('date', '==', this.date)
},
dict: () => {
console.info(`Deleting documents from ${this.collectionName}`)
return collectionRef
}
}

const queryBuilder = queryMap[this.collectionType]
if (!queryBuilder) {
throw new Error(`Invalid collection type: ${this.collectionType}`)
}

if (this.batchPromises.length > 0) {
await this.commitBatches()
return queryBuilder()
}

async getDocumentCount (query) {
try {
const countSnapshot = await query.count().get()
return countSnapshot.data().count
} catch (error) {
console.warn('Could not get document count for progress tracking:', error.message)
return 0
}
}

async batchDelete () {
console.info('Starting batch deletion...')
const startTime = Date.now()
this.currentBatch = []
this.batchPromises = []
this.reset()

let totalDocsDeleted = 0
const collectionRef = this.firestore.collection(this.collectionName)
const collectionQuery = this.buildQuery(collectionRef)

let collectionQuery
if (this.collectionType === 'report') {
console.info('Deleting documents from ' + this.collectionName + ' for date ' + this.date)
// Query to fetch monthly documents
collectionQuery = collectionRef.where('date', '==', this.date)
} else if (this.collectionType === 'dict') {
console.info('Deleting documents from ' + this.collectionName)
collectionQuery = collectionRef
} else {
throw new Error('Invalid collection type')
// Get total count for progress tracking
this.totalDocs = await this.getDocumentCount(collectionQuery)
if (this.totalDocs > 0) {
console.info(`Total documents to delete: ${this.totalDocs}`)
}

while (true) {
const snapshot = await collectionQuery.limit(this.batchSize * this.maxConcurrentBatches).get()
if (snapshot.empty) {
break
}
// Create BulkWriter for delete operations
this.bulkWriter = this.createBulkWriter('delet')

for await (const doc of snapshot.docs) {
this.currentBatch.push(doc)
let deletedCount = 0
const batchSize = this.config.flushThreshold // Process documents in chunks

if (this.currentBatch.length >= this.batchSize) {
this.queueBatch('delete')
}
if (this.batchPromises.length >= this.maxConcurrentBatches) {
await this.commitBatches()
}
totalDocsDeleted++
}
while (deletedCount < this.totalDocs || this.totalDocs === 0) {
const snapshot = await collectionQuery.limit(batchSize).get()
if (snapshot.empty) break

// Add all delete operations to BulkWriter
snapshot.docs.forEach(doc => {
this.bulkWriter.delete(doc.ref)
deletedCount++
})

// Periodically flush to manage memory
// if (deletedCount % this.config.flushThreshold === 0) {
console.log(`Flushing BulkWriter at ${deletedCount} operations...`)
await this.bulkWriter.flush()
// }
}
await this.finalFlush('delete')

// Final flush and close
console.log('Finalizing deletion operations...')
await this.bulkWriter.close()

const duration = (Date.now() - startTime) / 1000
console.info(`Deletion complete. Total docs deleted: ${totalDocsDeleted}. Time: ${duration} seconds`)
console.info(`Deletion complete. Total docs deleted: ${this.processedDocs}. Time: ${duration} seconds`)
}

/**
* Streams BigQuery query results into a Firestore collection using batch commits.
* @param {string} query - The BigQuery SQL query.
*/
async streamFromBigQuery (rowStream) {
console.info('Starting BigQuery to Firestore transfer...')
const startTime = Date.now()
let totalRowsProcessed = 0
this.reset()

this.currentBatch = []
this.batchPromises = []
// Create BulkWriter for write operations
this.bulkWriter = this.createBulkWriter('writ')

let rowCount = 0
const collectionRef = this.firestore.collection(this.collectionName)

for await (const row of rowStream) {
this.currentBatch.push(row)
// Add document to BulkWriter
const docRef = collectionRef.doc()
this.bulkWriter.set(docRef, row)

// Write batch when it reaches specified size
if (this.currentBatch.length >= this.batchSize) {
this.queueBatch('set')
}
rowCount++
this.totalDocs = rowCount // Update total as we go since we can't predict BigQuery result size

if (this.batchPromises.length >= this.maxConcurrentBatches) {
await this.commitBatches()
// Periodically flush to manage memory
if (rowCount % this.config.flushThreshold === 0) {
console.log(`Flushing BulkWriter at ${rowCount} operations...`)
await this.bulkWriter.flush()
}
totalRowsProcessed++
}
await this.finalFlush('set')

// Final flush and close
console.log('Finalizing write operations...')
await this.bulkWriter.close()

const duration = (Date.now() - startTime) / 1000
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${totalRowsProcessed}. Time: ${duration} seconds`)
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${this.processedDocs}. Time: ${duration} seconds`)
}

async export (query, exportConfig) {
// Configure Firestore settings
this.firestore.settings({
databaseId: exportConfig.database
databaseId: exportConfig.database,
timeout: this.config.timeout
})

// Set instance properties
Object.assign(this, {
collectionName: exportConfig.collection,
collectionType: exportConfig.type,
date: exportConfig.date
})
this.collectionName = exportConfig.collection
this.collectionType = exportConfig.type
this.date = exportConfig.date

await this.batchDelete()

Expand Down
2 changes: 1 addition & 1 deletion infra/bigquery-export/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"main": "index.js",
"scripts": {
"start": "node index.js",
"buildpack": "rm -rf node_modules; gcloud builds submit --pack image=us.gcr.io/httparchive/cloud-run/bigquery-export"
"build": "gcloud builds submit"
},
"type": "module",
"dependencies": {
Expand Down
Loading