Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OMP column reduction kernel #1369

Merged
merged 7 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/ginkgo/core/base/range.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ GKO_BIND_UNARY_RANGE_OPERATION_TO_OPERATOR(transpose_operation, transpose);


#define GKO_DEPRECATED_SIMPLE_BINARY_OPERATION(_deprecated_name, _name) \
struct [[deprecated("Please use " #_name)]] _deprecated_name : _name{};
struct [[deprecated("Please use " #_name)]] _deprecated_name : _name {}

#define GKO_DEFINE_SIMPLE_BINARY_OPERATION(_name, ...) \
struct _name { \
Expand Down
139 changes: 80 additions & 59 deletions omp/base/kernel_launch_reduction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ void run_kernel_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
ValueType* result, size_type size,
array<char>& tmp, MappedKernelArgs... args)
{
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto ssize = static_cast<int64>(size);
const auto work_per_thread = ceildiv(ssize, num_threads);
// Limit the number of threads to the number of columns
const auto num_threads = std::min<int64>(omp_get_max_threads(), ssize);
const auto work_per_thread =
ceildiv(ssize, std::max<int64>(num_threads, 1));
const auto required_storage = sizeof(ValueType) * num_threads;
if (tmp.get_num_elems() < required_storage) {
tmp.resize_and_reset(required_storage);
Expand All @@ -73,14 +75,17 @@ void run_kernel_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
#pragma omp parallel num_threads(num_threads)
{
const auto thread_id = omp_get_thread_num();
const auto begin = thread_id * work_per_thread;
const auto end = std::min(ssize, begin + work_per_thread);
if (thread_id < num_threads) {
const auto begin = thread_id * work_per_thread;
const auto end = std::min(ssize, begin + work_per_thread);

auto local_partial = identity;
for (auto i = begin; i < end; i++) {
local_partial = op(local_partial, fn(i, map_to_device(args)...));
auto local_partial = identity;
for (auto i = begin; i < end; i++) {
local_partial =
op(local_partial, fn(i, map_to_device(args)...));
}
partial[thread_id] = local_partial;
}
partial[thread_id] = local_partial;
}
*result =
finalize(std::accumulate(partial, partial + num_threads, identity, op));
Expand All @@ -99,8 +104,9 @@ void run_kernel_reduction_sized_impl(syn::value_list<int, remainder_cols>,
{
const auto rows = static_cast<int64>(size[0]);
const auto cols = static_cast<int64>(size[1]);
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto work_per_thread = ceildiv(rows, num_threads);
// Limit the number of threads to the number of columns
const auto num_threads = std::min<int64>(omp_get_max_threads(), rows);
const auto work_per_thread = ceildiv(rows, std::max<int64>(num_threads, 1));
const auto required_storage = sizeof(ValueType) * num_threads;
if (tmp.get_num_elems() < required_storage) {
tmp.resize_and_reset(required_storage);
Expand All @@ -109,43 +115,46 @@ void run_kernel_reduction_sized_impl(syn::value_list<int, remainder_cols>,
static_assert(remainder_cols < block_size, "remainder too large");
const auto rounded_cols = cols / block_size * block_size;
GKO_ASSERT(rounded_cols + remainder_cols == cols);
#pragma omp parallel
#pragma omp parallel num_threads(num_threads)
{
const auto thread_id = omp_get_thread_num();
const auto begin = thread_id * work_per_thread;
const auto end = std::min(rows, begin + work_per_thread);

auto local_partial = identity;
if (rounded_cols == 0 || cols == block_size) {
// we group all sizes <= block_size here and unroll explicitly
constexpr auto local_cols =
remainder_cols == 0 ? block_size : remainder_cols;
for (auto row = begin; row < end; row++) {
#pragma unroll
for (int64 col = 0; col < local_cols; col++) {
local_partial = op(local_partial, fn(row, col, args...));
}
}
} else {
// we operate in block_size blocks plus an explicitly unrolled
// remainder
for (auto row = begin; row < end; row++) {
for (int64 base_col = 0; base_col < rounded_cols;
base_col += block_size) {
if (thread_id < num_threads) {
const auto begin = thread_id * work_per_thread;
const auto end = std::min(rows, begin + work_per_thread);

auto local_partial = identity;
if (rounded_cols == 0 || cols == block_size) {
// we group all sizes <= block_size here and unroll explicitly
constexpr auto local_cols =
remainder_cols == 0 ? block_size : remainder_cols;
for (auto row = begin; row < end; row++) {
#pragma unroll
for (int64 i = 0; i < block_size; i++) {
for (int64 col = 0; col < local_cols; col++) {
local_partial =
op(local_partial, fn(row, base_col + i, args...));
op(local_partial, fn(row, col, args...));
}
}
} else {
// we operate in block_size blocks plus an explicitly unrolled
// remainder
for (auto row = begin; row < end; row++) {
for (int64 base_col = 0; base_col < rounded_cols;
base_col += block_size) {
#pragma unroll
for (int64 i = 0; i < remainder_cols; i++) {
local_partial =
op(local_partial, fn(row, rounded_cols + i, args...));
for (int64 i = 0; i < block_size; i++) {
local_partial = op(local_partial,
fn(row, base_col + i, args...));
}
}
#pragma unroll
for (int64 i = 0; i < remainder_cols; i++) {
local_partial = op(local_partial,
fn(row, rounded_cols + i, args...));
}
}
}
partial[thread_id] = local_partial;
}
partial[thread_id] = local_partial;
}
*result =
finalize(std::accumulate(partial, partial + num_threads, identity, op));
Expand Down Expand Up @@ -210,12 +219,12 @@ void run_kernel_row_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
constexpr int block_size = 8;
const auto rows = static_cast<int64>(size[0]);
const auto cols = static_cast<int64>(size[1]);
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto available_threads = static_cast<int64>(omp_get_max_threads());
if (rows <= 0) {
return;
}
// enough work to keep all threads busy or only very small reduction sizes
if (rows >= reduction_kernel_oversubscription * num_threads ||
if (rows >= reduction_kernel_oversubscription * available_threads ||
cols < rows) {
#pragma omp parallel for
for (int64 row = 0; row < rows; row++) {
Expand All @@ -229,36 +238,44 @@ void run_kernel_row_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
}
} else {
// small number of rows and large reduction sizes: do partial sum first
const auto work_per_thread = ceildiv(cols, num_threads);
const auto required_storage = sizeof(ValueType) * rows * num_threads;
const auto num_threads = std::min<int64>(available_threads, cols);
const auto work_per_thread =
ceildiv(cols, std::max<int64>(num_threads, 1));
const auto temp_elems_per_row = num_threads;
const auto required_storage =
sizeof(ValueType) * rows * temp_elems_per_row;
if (tmp.get_num_elems() < required_storage) {
tmp.resize_and_reset(required_storage);
}
const auto partial = reinterpret_cast<ValueType*>(tmp.get_data());
#pragma omp parallel num_threads(num_threads)
{
const auto thread_id = static_cast<int64>(omp_get_thread_num());
const auto begin = thread_id * work_per_thread;
const auto end = std::min(begin + work_per_thread, cols);
for (int64 row = 0; row < rows; row++) {
auto local_partial = identity;
for (int64 col = begin; col < end; col++) {
local_partial = op(local_partial, [&]() {
return fn(row, col, args...);
}());
if (thread_id < num_threads) {
const auto begin = thread_id * work_per_thread;
const auto end = std::min(begin + work_per_thread, cols);
for (int64 row = 0; row < rows; row++) {
auto local_partial = identity;
for (int64 col = begin; col < end; col++) {
local_partial = op(local_partial, [&]() {
return fn(row, col, args...);
}());
}
partial[row * temp_elems_per_row + thread_id] =
local_partial;
}
partial[row * num_threads + thread_id] = local_partial;
}
}
// then accumulate the partial sums and write to result
#pragma omp parallel for
for (int64 row = 0; row < rows; row++) {
[&] {
auto local_partial = identity;
for (int64 thread_id = 0; thread_id < num_threads;
for (int64 thread_id = 0; thread_id < temp_elems_per_row;
thread_id++) {
local_partial = op(local_partial,
partial[row * num_threads + thread_id]);
local_partial =
op(local_partial,
partial[row * temp_elems_per_row + thread_id]);
}
result[row * result_stride] = finalize(local_partial);
}();
Expand Down Expand Up @@ -302,12 +319,12 @@ void run_kernel_col_reduction_sized_impl(
{
const auto rows = static_cast<int64>(size[0]);
const auto cols = static_cast<int64>(size[1]);
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto available_threads = static_cast<int64>(omp_get_max_threads());
static_assert(remainder_cols < block_size, "remainder too large");
GKO_ASSERT(cols % block_size == remainder_cols);
const auto num_col_blocks = ceildiv(cols, block_size);
// enough work to keep all threads busy or only very small reduction sizes
if (cols >= reduction_kernel_oversubscription * num_threads ||
if (cols >= reduction_kernel_oversubscription * available_threads ||
rows < cols) {
#pragma omp parallel for
for (int64 col_block = 0; col_block < num_col_blocks; col_block++) {
Expand All @@ -324,10 +341,14 @@ void run_kernel_col_reduction_sized_impl(
}
} else {
// number of blocks that need to be reduced afterwards
const auto reduction_size =
ceildiv(reduction_kernel_oversubscription * num_threads, cols);
const auto rows_per_thread = ceildiv(rows, reduction_size);
const auto required_storage = sizeof(ValueType) * rows * reduction_size;
// This reduction_size definition ensures we don't use more temporary
// storage than the input vector
const auto reduction_size = std::min(
rows, ceildiv(reduction_kernel_oversubscription * available_threads,
std::max<int64>(cols, 1)));
const auto rows_per_thread =
ceildiv(rows, std::max<int64>(reduction_size, 1));
const auto required_storage = sizeof(ValueType) * cols * reduction_size;
if (tmp.get_num_elems() < required_storage) {
tmp.resize_and_reset(required_storage);
}
Expand Down
Loading