A bit of context:I am a co-founder of company where we aggregate tons of data. Data is aggregated from thousands of different sources in batches, i.e. there is a job scheduler that orchestrates when to collect data from which source. During the data aggregation process we produce an obscene amount of logs (about 2GB/ hour). I found it that for the most part logs for successfully executed jobs are completely useless. Therefore, we throw away all logs of successfully executed tasks and only record logs for failed tasks. To do this, we use output-interceptor to capture logs of task execution and (in case of an error) record them to the database along with the summary of the task execution, i.e. structurally the data aggregation task looks like this:const runTask = async (id) => { const interceptOutput = createOutputInterceptor(); let taskExecutionResult; let taskExecutionError; try { taskExecutionResult = await interceptOutput(() => { // ... do the data aggregation task }); } catch (error) { taskExecutionError = error; } if (taskExecutionResult) { await recordTaskExecution(id, null, null, taskExecutionResult); } else { await recordTaskExecution(id, taskExecutionError, interceptOutput.output, null); } }; runTask(1); Currently we store the captured stdout/stderr as a plain text in a TOASTed PostgreSQL column along with the summary of the task execution. The benefit of this approach over logging in ELK or a similar log aggregator is that it enables quick association of the database records with the logs. The downside is that it creates significant disk IO/ RDBMSes are not designed for such inputs, and you need to design a custom dashboard for viewing the logs.Therefore I was thinking of creating an open-source project service (and perhaps a hosted service) that abstracts selectively capturing part of Node.js stdout, parsing it (in case of structured logs), storing it for a selected time frame and enabling browsing them/ stats, i.e.const runTask = async (id) => { const interceptOutput = createOutputInterceptor(); let taskExecutionResult; let taskExecutionError; try { taskExecutionResult = await interceptOutput(() => { // ... do the data aggregation task }); } catch (error) { taskExecutionError = error; } if (taskExecutionResult) { await recordTaskExecution(id, null, null, taskExecutionResult); } else { const logId = await log(interceptOutput.output, { tags: ['data-aggregation'], expires: '1 day' }); await recordTaskExecution(id, taskExecutionError, logId, null); } }; runTask(1); The log ID would be then used to quickly access snippet of logs associated with that job in user-friendly log browser.
Submitted January 14, 2019 at 12:54PM by gajus0
No comments:
Post a Comment