BigW Consortium Gitlab

Commit 12bc2d8d by Grzegorz Bizon

Merge branch 'proper-fix-for-artifacts-service' into 'master'

Add archive feature to trace Closes #43022 and gitlab-ee#4170 See merge request gitlab-org/gitlab-ce!17314
parents e5d03612 0ac13220
module Ci
class CreateTraceArtifactService < BaseService
def execute(job)
return if job.job_artifacts_trace
job.trace.read do |stream|
break unless stream.file?
clone_file!(stream.path, JobArtifactUploader.workhorse_upload_path) do |clone_path|
create_job_trace!(job, clone_path)
FileUtils.rm(stream.path)
end
end
end
private
def create_job_trace!(job, path)
File.open(path) do |stream|
job.create_job_artifacts_trace!(
project: job.project,
file_type: :trace,
file: stream)
end
end
def clone_file!(src_path, temp_dir)
FileUtils.mkdir_p(temp_dir)
Dir.mktmpdir('tmp-trace', temp_dir) do |dir_path|
temp_path = File.join(dir_path, "job.log")
FileUtils.copy(src_path, temp_path)
yield(temp_path)
end
end
end
end
......@@ -43,9 +43,9 @@
- pipeline_cache:expire_pipeline_cache
- pipeline_creation:create_pipeline
- pipeline_creation:run_pipeline_schedule
- pipeline_background:archive_trace
- pipeline_default:build_coverage
- pipeline_default:build_trace_sections
- pipeline_default:create_trace_artifact
- pipeline_default:pipeline_metrics
- pipeline_default:pipeline_notification
- pipeline_default:update_head_pipeline_for_merge_request
......
class CreateTraceArtifactWorker
class ArchiveTraceWorker
include ApplicationWorker
include PipelineQueue
include PipelineBackgroundQueue
def perform(job_id)
Ci::Build.preload(:project, :user).find_by(id: job_id).try do |job|
Ci::CreateTraceArtifactService.new(job.project, job.user).execute(job)
Ci::Build.find_by(id: job_id).try do |job|
job.trace.archive!
end
end
end
......@@ -12,7 +12,7 @@ class BuildFinishedWorker
# We execute that async as this are two indepentent operations that can be executed after TraceSections and Coverage
BuildHooksWorker.perform_async(build.id)
CreateTraceArtifactWorker.perform_async(build.id)
ArchiveTraceWorker.perform_async(build.id)
end
end
end
##
# Concern for setting Sidekiq settings for the low priority CI pipeline workers.
#
module PipelineBackgroundQueue
extend ActiveSupport::Concern
included do
queue_namespace :pipeline_background
end
end
---
title: Add archive feature to trace
merge_request: 17314
author:
type: added
......@@ -69,3 +69,4 @@
- [storage_migrator, 1]
- [pages_domain_verification, 1]
- [plugin, 1]
- [pipeline_background, 1]
class MigrateCreateTraceArtifactSidekiqQueue < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
sidekiq_queue_migrate 'pipeline_default:create_trace_artifact', to: 'pipeline_background:archive_trace'
end
def down
sidekiq_queue_migrate 'pipeline_background:archive_trace', to: 'pipeline_default:create_trace_artifact'
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20180305144721) do
ActiveRecord::Schema.define(version: 20180306074045) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......
module Gitlab
module Ci
class Trace
ArchiveError = Class.new(StandardError)
attr_reader :job
delegate :old_trace, to: :job
......@@ -93,8 +95,52 @@ module Gitlab
job.erase_old_trace!
end
def archive!
raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
if current_path
File.open(current_path) do |stream|
archive_stream!(stream)
FileUtils.rm(current_path)
end
elsif old_trace
StringIO.new(old_trace, 'rb').tap do |stream|
archive_stream!(stream)
job.erase_old_trace!
end
end
end
private
def archive_stream!(stream)
clone_file!(stream, JobArtifactUploader.workhorse_upload_path) do |clone_path|
create_job_trace!(job, clone_path)
end
end
def clone_file!(src_stream, temp_dir)
FileUtils.mkdir_p(temp_dir)
Dir.mktmpdir('tmp-trace', temp_dir) do |dir_path|
temp_path = File.join(dir_path, "job.log")
FileUtils.touch(temp_path)
size = IO.copy_stream(src_stream, temp_path)
raise ArchiveError, 'Failed to copy stream' unless size == src_stream.size
yield(temp_path)
end
end
def create_job_trace!(job, path)
File.open(path) do |stream|
job.create_job_artifacts_trace!(
project: job.project,
file_type: :trace,
file: stream)
end
end
def ensure_path
return current_path if current_path
......
require 'logger'
require 'resolv-replace'
desc "GitLab | Archive legacy traces to trace artifacts"
namespace :gitlab do
namespace :traces do
task archive: :environment do
logger = Logger.new(STDOUT)
logger.info('Archiving legacy traces')
Ci::Build.finished
.where('NOT EXISTS (?)',
Ci::JobArtifact.select(1).trace.where('ci_builds.id = ci_job_artifacts.job_id'))
.order(id: :asc)
.find_in_batches(batch_size: 1000) do |jobs|
job_ids = jobs.map { |job| [job.id] }
ArchiveTraceWorker.bulk_perform_async(job_ids)
logger.info("Scheduled #{job_ids.count} jobs. From #{job_ids.min} to #{job_ids.max}")
end
end
end
end
......@@ -399,4 +399,138 @@ describe Gitlab::Ci::Trace do
end
end
end
describe '#archive!' do
subject { trace.archive! }
shared_examples 'archive trace file' do
it do
expect { subject }.to change { Ci::JobArtifact.count }.by(1)
build.reload
expect(build.trace.exist?).to be_truthy
expect(build.job_artifacts_trace.file.exists?).to be_truthy
expect(build.job_artifacts_trace.file.filename).to eq('job.log')
expect(File.exist?(src_path)).to be_falsy
expect(src_checksum)
.to eq(Digest::SHA256.file(build.job_artifacts_trace.file.path).digest)
end
end
shared_examples 'source trace file stays intact' do |error:|
it do
expect { subject }.to raise_error(error)
build.reload
expect(build.trace.exist?).to be_truthy
expect(build.job_artifacts_trace).to be_nil
expect(File.exist?(src_path)).to be_truthy
end
end
shared_examples 'archive trace in database' do
it do
expect { subject }.to change { Ci::JobArtifact.count }.by(1)
build.reload
expect(build.trace.exist?).to be_truthy
expect(build.job_artifacts_trace.file.exists?).to be_truthy
expect(build.job_artifacts_trace.file.filename).to eq('job.log')
expect(build.old_trace).to be_nil
expect(src_checksum)
.to eq(Digest::SHA256.file(build.job_artifacts_trace.file.path).digest)
end
end
shared_examples 'source trace in database stays intact' do |error:|
it do
expect { subject }.to raise_error(error)
build.reload
expect(build.trace.exist?).to be_truthy
expect(build.job_artifacts_trace).to be_nil
expect(build.old_trace).to eq(trace_content)
end
end
context 'when job does not have trace artifact' do
context 'when trace file stored in default path' do
let!(:build) { create(:ci_build, :success, :trace_live) }
let!(:src_path) { trace.read { |s| return s.path } }
let!(:src_checksum) { Digest::SHA256.file(src_path).digest }
it_behaves_like 'archive trace file'
context 'when failed to create clone file' do
before do
allow(IO).to receive(:copy_stream).and_return(0)
end
it_behaves_like 'source trace file stays intact', error: Gitlab::Ci::Trace::ArchiveError
end
context 'when failed to create job artifact record' do
before do
allow_any_instance_of(Ci::JobArtifact).to receive(:save).and_return(false)
allow_any_instance_of(Ci::JobArtifact).to receive_message_chain(:errors, :full_messages)
.and_return(%w[Error Error])
end
it_behaves_like 'source trace file stays intact', error: ActiveRecord::RecordInvalid
end
end
context 'when trace is stored in database' do
let(:build) { create(:ci_build, :success) }
let(:trace_content) { 'Sample trace' }
let!(:src_checksum) { Digest::SHA256.digest(trace_content) }
before do
build.update_column(:trace, trace_content)
end
it_behaves_like 'archive trace in database'
context 'when failed to create clone file' do
before do
allow(IO).to receive(:copy_stream).and_return(0)
end
it_behaves_like 'source trace in database stays intact', error: Gitlab::Ci::Trace::ArchiveError
end
context 'when failed to create job artifact record' do
before do
allow_any_instance_of(Ci::JobArtifact).to receive(:save).and_return(false)
allow_any_instance_of(Ci::JobArtifact).to receive_message_chain(:errors, :full_messages)
.and_return(%w[Error Error])
end
it_behaves_like 'source trace in database stays intact', error: ActiveRecord::RecordInvalid
end
end
end
context 'when job has trace artifact' do
before do
create(:ci_job_artifact, :trace, job: build)
end
it 'does not archive' do
expect_any_instance_of(described_class).not_to receive(:archive_stream!)
expect { subject }.to raise_error('Already archived')
expect(build.job_artifacts_trace.file.exists?).to be_truthy
end
end
context 'when job is not finished yet' do
let!(:build) { create(:ci_build, :running, :trace_live) }
it 'does not archive' do
expect_any_instance_of(described_class).not_to receive(:archive_stream!)
expect { subject }.to raise_error('Job is not finished yet')
expect(build.trace.exist?).to be_truthy
end
end
end
end
require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20180306074045_migrate_create_trace_artifact_sidekiq_queue.rb')
describe MigrateCreateTraceArtifactSidekiqQueue, :sidekiq, :redis do
include Gitlab::Database::MigrationHelpers
context 'when there are jobs in the queues' do
it 'correctly migrates queue when migrating up' do
Sidekiq::Testing.disable! do
stubbed_worker(queue: 'pipeline_default:create_trace_artifact').perform_async('Something', [1])
stubbed_worker(queue: 'pipeline_background:archive_trace').perform_async('Something', [1])
described_class.new.up
expect(sidekiq_queue_length('pipeline_default:create_trace_artifact')).to eq 0
expect(sidekiq_queue_length('pipeline_background:archive_trace')).to eq 2
end
end
it 'does not affect other queues under the same namespace' do
Sidekiq::Testing.disable! do
stubbed_worker(queue: 'pipeline_default:build_coverage').perform_async('Something', [1])
stubbed_worker(queue: 'pipeline_default:build_trace_sections').perform_async('Something', [1])
stubbed_worker(queue: 'pipeline_default:pipeline_metrics').perform_async('Something', [1])
stubbed_worker(queue: 'pipeline_default:pipeline_notification').perform_async('Something', [1])
stubbed_worker(queue: 'pipeline_default:update_head_pipeline_for_merge_request').perform_async('Something', [1])
described_class.new.up
expect(sidekiq_queue_length('pipeline_default:build_coverage')).to eq 1
expect(sidekiq_queue_length('pipeline_default:build_trace_sections')).to eq 1
expect(sidekiq_queue_length('pipeline_default:pipeline_metrics')).to eq 1
expect(sidekiq_queue_length('pipeline_default:pipeline_notification')).to eq 1
expect(sidekiq_queue_length('pipeline_default:update_head_pipeline_for_merge_request')).to eq 1
end
end
it 'correctly migrates queue when migrating down' do
Sidekiq::Testing.disable! do
stubbed_worker(queue: 'pipeline_background:archive_trace').perform_async('Something', [1])
described_class.new.down
expect(sidekiq_queue_length('pipeline_default:create_trace_artifact')).to eq 1
expect(sidekiq_queue_length('pipeline_background:archive_trace')).to eq 0
end
end
end
context 'when there are no jobs in the queues' do
it 'does not raise error when migrating up' do
expect { described_class.new.up }.not_to raise_error
end
it 'does not raise error when migrating down' do
expect { described_class.new.down }.not_to raise_error
end
end
def stubbed_worker(queue:)
Class.new do
include Sidekiq::Worker
sidekiq_options queue: queue
end
end
end
......@@ -698,10 +698,10 @@ describe API::Runner do
end
end
context 'when tace is given' do
context 'when trace is given' do
it 'creates a trace artifact' do
allow(BuildFinishedWorker).to receive(:perform_async).with(job.id) do
CreateTraceArtifactWorker.new.perform(job.id)
ArchiveTraceWorker.new.perform(job.id)
end
update_job(state: 'success', trace: 'BUILD TRACE UPDATED')
......
require 'spec_helper'
describe Ci::CreateTraceArtifactService do
describe '#execute' do
subject { described_class.new(nil, nil).execute(job) }
context 'when the job does not have trace artifact' do
context 'when the job has a trace file' do
let!(:job) { create(:ci_build, :trace_live) }
let!(:legacy_path) { job.trace.read { |stream| return stream.path } }
let!(:legacy_checksum) { Digest::SHA256.file(legacy_path).hexdigest }
let(:new_path) { job.job_artifacts_trace.file.path }
let(:new_checksum) { Digest::SHA256.file(new_path).hexdigest }
it { expect(File.exist?(legacy_path)).to be_truthy }
it 'creates trace artifact' do
expect { subject }.to change { Ci::JobArtifact.count }.by(1)
expect(File.exist?(legacy_path)).to be_falsy
expect(File.exist?(new_path)).to be_truthy
expect(new_checksum).to eq(legacy_checksum)
expect(job.job_artifacts_trace.file.exists?).to be_truthy
expect(job.job_artifacts_trace.file.filename).to eq('job.log')
end
context 'when failed to create trace artifact record' do
before do
# When ActiveRecord error happens
allow_any_instance_of(Ci::JobArtifact).to receive(:save).and_return(false)
allow_any_instance_of(Ci::JobArtifact).to receive_message_chain(:errors, :full_messages)
.and_return("Error")
subject rescue nil
job.reload
end
it 'keeps legacy trace and removes trace artifact' do
expect(File.exist?(legacy_path)).to be_truthy
expect(job.job_artifacts_trace).to be_nil
end
end
end
context 'when the job does not have a trace file' do
let!(:job) { create(:ci_build) }
it 'does not create trace artifact' do
expect { subject }.not_to change { Ci::JobArtifact.count }
end
end
end
context 'when the job has already had trace artifact' do
let!(:job) { create(:ci_build, :trace_artifact) }
it 'does not create trace artifact' do
expect { subject }.not_to change { Ci::JobArtifact.count }
end
end
end
end
require 'rake_helper'
describe 'gitlab:traces rake tasks' do
before do
Rake.application.rake_require 'tasks/gitlab/traces'
end
shared_examples 'passes the job id to worker' do
it do
expect(ArchiveTraceWorker).to receive(:bulk_perform_async).with([[job.id]])
run_rake_task('gitlab:traces:archive')
end
end
shared_examples 'does not pass the job id to worker' do
it do
expect(ArchiveTraceWorker).not_to receive(:bulk_perform_async)
run_rake_task('gitlab:traces:archive')
end
end
context 'when trace file stored in default path' do
let!(:job) { create(:ci_build, :success, :trace_live) }
it_behaves_like 'passes the job id to worker'
end
context 'when trace is stored in database' do
let!(:job) { create(:ci_build, :success) }
before do
job.update_column(:trace, 'trace in db')
end
it_behaves_like 'passes the job id to worker'
end
context 'when job has trace artifact' do
let!(:job) { create(:ci_build, :success) }
before do
create(:ci_job_artifact, :trace, job: job)
end
it_behaves_like 'does not pass the job id to worker'
end
context 'when job is not finished yet' do
let!(:build) { create(:ci_build, :running, :trace_live) }
it_behaves_like 'does not pass the job id to worker'
end
end
require 'spec_helper'
describe CreateTraceArtifactWorker do
describe ArchiveTraceWorker do
describe '#perform' do
subject { described_class.new.perform(job&.id) }
......@@ -8,8 +8,7 @@ describe CreateTraceArtifactWorker do
let(:job) { create(:ci_build) }
it 'executes service' do
expect_any_instance_of(Ci::CreateTraceArtifactService)
.to receive(:execute).with(job)
expect_any_instance_of(Gitlab::Ci::Trace).to receive(:archive!)
subject
end
......@@ -19,8 +18,7 @@ describe CreateTraceArtifactWorker do
let(:job) { nil }
it 'does not execute service' do
expect_any_instance_of(Ci::CreateTraceArtifactService)
.not_to receive(:execute)
expect_any_instance_of(Gitlab::Ci::Trace).not_to receive(:archive!)
subject
end
......
......@@ -14,7 +14,7 @@ describe BuildFinishedWorker do
expect_any_instance_of(BuildTraceSectionsWorker).to receive(:perform)
expect_any_instance_of(BuildCoverageWorker).to receive(:perform)
expect(BuildHooksWorker).to receive(:perform_async)
expect(CreateTraceArtifactWorker).to receive(:perform_async)
expect(ArchiveTraceWorker).to receive(:perform_async)
described_class.new.perform(build.id)
end
......
require 'spec_helper'
describe PipelineBackgroundQueue do
let(:worker) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
include PipelineBackgroundQueue
end
end
it 'sets a default object storage queue automatically' do
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_background:dummy'
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment