BigW Consortium Gitlab

Commit ef9b9b5d by Douwe Maan

Merge branch 'grpc-unavailable-restart' into 'master'

Restart Unicorn and Sidekiq when GRPC throws 14:Endpoint read failed Closes gitaly#1036 See merge request gitlab-org/gitlab-ce!17293
parents e1de8a77 00675311
---
title: Restart Unicorn and Sidekiq when GRPC throws 14:Endpoint read failed
merge_request: 17293
author:
type: fixed
...@@ -10,7 +10,7 @@ Sidekiq.configure_server do |config| ...@@ -10,7 +10,7 @@ Sidekiq.configure_server do |config|
config.server_middleware do |chain| config.server_middleware do |chain|
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS']
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] chain.add Gitlab::SidekiqMiddleware::Shutdown
chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0' chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0'
chain.add Gitlab::SidekiqStatus::ServerMiddleware chain.add Gitlab::SidekiqStatus::ServerMiddleware
end end
......
...@@ -125,6 +125,8 @@ module Gitlab ...@@ -125,6 +125,8 @@ module Gitlab
kwargs = yield(kwargs) if block_given? kwargs = yield(kwargs) if block_given?
stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend
rescue GRPC::Unavailable => ex
handle_grpc_unavailable!(ex)
ensure ensure
duration = Gitlab::Metrics::System.monotonic_time - start duration = Gitlab::Metrics::System.monotonic_time - start
...@@ -135,6 +137,27 @@ module Gitlab ...@@ -135,6 +137,27 @@ module Gitlab
duration) duration)
end end
def self.handle_grpc_unavailable!(ex)
status = ex.to_status
raise ex unless status.details == 'Endpoint read failed'
# There is a bug in grpc 1.8.x that causes a client process to get stuck
# always raising '14:Endpoint read failed'. The only thing that we can
# do to recover is to restart the process.
#
# See https://gitlab.com/gitlab-org/gitaly/issues/1029
if Sidekiq.server?
raise Gitlab::SidekiqMiddleware::Shutdown::WantShutdown.new(ex.to_s)
else
# SIGQUIT requests a Unicorn worker to shut down gracefully after the current request.
Process.kill('QUIT', Process.pid)
end
raise ex
end
private_class_method :handle_grpc_unavailable!
def self.current_transaction_labels def self.current_transaction_labels
Gitlab::Metrics::Transaction.current&.labels || {} Gitlab::Metrics::Transaction.current&.labels || {}
end end
......
require 'mutex_m'
module Gitlab module Gitlab
module SidekiqMiddleware module SidekiqMiddleware
class MemoryKiller class Shutdown
extend Mutex_m
# Default the RSS limit to 0, meaning the MemoryKiller is disabled # Default the RSS limit to 0, meaning the MemoryKiller is disabled
MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
# Give Sidekiq 15 minutes of grace time after exceeding the RSS limit # Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
...@@ -8,28 +12,68 @@ module Gitlab ...@@ -8,28 +12,68 @@ module Gitlab
# Wait 30 seconds for running jobs to finish during graceful shutdown # Wait 30 seconds for running jobs to finish during graceful shutdown
SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i
# Create a mutex used to ensure there will be only one thread waiting to # This exception can be used to request that the middleware start shutting down Sidekiq
# shut Sidekiq down WantShutdown = Class.new(StandardError)
MUTEX = Mutex.new
ShutdownWithoutRaise = Class.new(WantShutdown)
private_constant :ShutdownWithoutRaise
# For testing only, to avoid race conditions (?) in Rspec mocks.
attr_reader :trace
# We store the shutdown thread in a class variable to ensure that there
# can be only one shutdown thread in the process.
def self.create_shutdown_thread
mu_synchronize do
return unless @shutdown_thread.nil?
@shutdown_thread = Thread.new { yield }
end
end
# For testing only: so we can wait for the shutdown thread to finish.
def self.shutdown_thread
mu_synchronize { @shutdown_thread }
end
# For testing only: so that we can reset the global state before each test.
def self.clear_shutdown_thread
mu_synchronize { @shutdown_thread = nil }
end
def initialize
@trace = Queue.new if Rails.env.test?
end
def call(worker, job, queue) def call(worker, job, queue)
shutdown_exception = nil
begin
yield yield
check_rss!
rescue WantShutdown => ex
shutdown_exception = ex
end
current_rss = get_rss return unless shutdown_exception
return unless MAX_RSS > 0 && current_rss > MAX_RSS self.class.create_shutdown_thread do
do_shutdown(worker, job, shutdown_exception)
end
Thread.new do raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise)
# Return if another thread is already waiting to shut Sidekiq down end
return unless MUTEX.try_lock
private
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} current RSS #{current_rss}"\ def do_shutdown(worker, job, shutdown_exception)
" exceeds maximum RSS #{MAX_RSS} after finishing job #{worker.class} JID-#{job['jid']}" Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\
"#{worker.class} JID-#{job['jid']}"
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later" Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"
# Wait `GRACE_TIME` to give the memory intensive job time to finish. # Wait `GRACE_TIME` to give the memory intensive job time to finish.
# Then, tell Sidekiq to stop fetching new jobs. # Then, tell Sidekiq to stop fetching new jobs.
wait_and_signal(GRACE_TIME, 'SIGSTP', 'stop fetching new jobs') wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs')
# Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish. # Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
# Then, tell Sidekiq to gracefully shut down by giving jobs a few more # Then, tell Sidekiq to gracefully shut down by giving jobs a few more
...@@ -40,9 +84,15 @@ module Gitlab ...@@ -40,9 +84,15 @@ module Gitlab
# Wait for Sidekiq to shutdown gracefully, and kill it if it didn't. # Wait for Sidekiq to shutdown gracefully, and kill it if it didn't.
wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die') wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die')
end end
end
private def check_rss!
return unless MAX_RSS > 0
current_rss = get_rss
return unless current_rss > MAX_RSS
raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}")
end
def get_rss def get_rss
output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s) output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
...@@ -56,12 +106,28 @@ module Gitlab ...@@ -56,12 +106,28 @@ module Gitlab
sleep(time) sleep(time)
Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})" Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
Process.kill(signal, pid) kill(signal, pid)
end end
def pid def pid
Process.pid Process.pid
end end
def sleep(time)
if Rails.env.test?
@trace << [:sleep, time]
else
Kernel.sleep(time)
end
end
def kill(signal, pid)
if Rails.env.test?
@trace << [:kill, signal, pid]
else
Process.kill(signal, pid)
end
end
end end
end end
end end
require 'spec_helper' require 'spec_helper'
describe Gitlab::SidekiqMiddleware::MemoryKiller do describe Gitlab::SidekiqMiddleware::Shutdown do
subject { described_class.new } subject { described_class.new }
let(:pid) { 999 }
let(:pid) { Process.pid }
let(:worker) { double(:worker, class: 'TestWorker') } let(:worker) { double(:worker, class: 'TestWorker') }
let(:job) { { 'jid' => 123 } } let(:job) { { 'jid' => 123 } }
let(:queue) { 'test_queue' } let(:queue) { 'test_queue' }
let(:block) { proc { nil } }
def run def run
thread = subject.call(worker, job, queue) { nil } subject.call(worker, job, queue) { block.call }
thread&.join described_class.shutdown_thread&.join
end
def pop_trace
subject.trace.pop(true)
end end
before do before do
allow(subject).to receive(:get_rss).and_return(10.kilobytes) allow(subject).to receive(:get_rss).and_return(10.kilobytes)
allow(subject).to receive(:pid).and_return(pid) described_class.clear_shutdown_thread
end end
context 'when MAX_RSS is set to 0' do context 'when MAX_RSS is set to 0' do
...@@ -30,22 +35,26 @@ describe Gitlab::SidekiqMiddleware::MemoryKiller do ...@@ -30,22 +35,26 @@ describe Gitlab::SidekiqMiddleware::MemoryKiller do
end end
end end
def expect_shutdown_sequence
expect(pop_trace).to eq([:sleep, 15 * 60])
expect(pop_trace).to eq([:kill, 'SIGTSTP', pid])
expect(pop_trace).to eq([:sleep, 30])
expect(pop_trace).to eq([:kill, 'SIGTERM', pid])
expect(pop_trace).to eq([:sleep, 10])
expect(pop_trace).to eq([:kill, 'SIGKILL', pid])
end
context 'when MAX_RSS is exceeded' do context 'when MAX_RSS is exceeded' do
before do before do
stub_const("#{described_class}::MAX_RSS", 5.kilobytes) stub_const("#{described_class}::MAX_RSS", 5.kilobytes)
end end
it 'sends the STP, TERM and KILL signals at expected times' do it 'sends the TSTP, TERM and KILL signals at expected times' do
expect(subject).to receive(:sleep).with(15 * 60).ordered
expect(Process).to receive(:kill).with('SIGSTP', pid).ordered
expect(subject).to receive(:sleep).with(30).ordered
expect(Process).to receive(:kill).with('SIGTERM', pid).ordered
expect(subject).to receive(:sleep).with(10).ordered
expect(Process).to receive(:kill).with('SIGKILL', pid).ordered
run run
expect_shutdown_sequence
end end
end end
...@@ -60,4 +69,20 @@ describe Gitlab::SidekiqMiddleware::MemoryKiller do ...@@ -60,4 +69,20 @@ describe Gitlab::SidekiqMiddleware::MemoryKiller do
run run
end end
end end
context 'when WantShutdown is raised' do
let(:block) { proc { raise described_class::WantShutdown } }
it 'starts the shutdown sequence and re-raises the exception' do
expect { run }.to raise_exception(described_class::WantShutdown)
# We can't expect 'run' to have joined on the shutdown thread, because
# it hit an exception.
shutdown_thread = described_class.shutdown_thread
expect(shutdown_thread).not_to be_nil
shutdown_thread.join
expect_shutdown_sequence
end
end
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment