Load images in multiple threads

This worsens CPU-only times by some five percent,
but can also make GPU-accelerated runtime twice as fast.
This commit is contained in:
Přemysl Eric Janouch 2024-01-18 00:54:40 +01:00
parent b4f28814b7
commit 36f6612603
Signed by: p
GPG Key ID: A0420B94F92B9493

View File

@ -6,13 +6,17 @@
#endif #endif
#include <algorithm> #include <algorithm>
#include <condition_variable>
#include <filesystem> #include <filesystem>
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include <mutex>
#include <queue>
#include <regex> #include <regex>
#include <set> #include <set>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <thread>
#include <tuple> #include <tuple>
#include <cstdio> #include <cstdio>
@ -435,6 +439,62 @@ add_providers(Ort::SessionOptions &options)
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct Thumbnailing {
std::mutex input_mutex;
std::condition_variable input_cv;
std::queue<std::string> input; // All input paths
int work = 0; // Number of images requested
std::mutex output_mutex;
std::condition_variable output_cv;
std::vector<Magick::Image> output; // Processed images
int done = 0; // Finished worker threads
};
static void
thumbnail(const Config &config, int64_t width, int64_t height,
Thumbnailing &ctx)
{
while (true) {
std::unique_lock<std::mutex> input_lock(ctx.input_mutex);
ctx.input_cv.wait(input_lock,
[&]{ return ctx.input.empty() || ctx.work; });
if (ctx.input.empty())
break;
auto path = ctx.input.front();
ctx.input.pop();
ctx.work--;
input_lock.unlock();
Magick::Image image;
try {
image = load(path, config, width, height);
if (height != image.rows() || width != image.columns())
throw std::runtime_error("tensor mismatch");
std::unique_lock<std::mutex> output_lock(ctx.output_mutex);
ctx.output.push_back(image);
output_lock.unlock();
ctx.output_cv.notify_all();
} catch (const std::exception &e) {
fprintf(stderr, "%s: %s\n", path.c_str(), e.what());
std::unique_lock<std::mutex> input_lock(ctx.input_mutex);
ctx.work++;
input_lock.unlock();
ctx.input_cv.notify_all();
}
}
std::unique_lock<std::mutex> output_lock(ctx.output_mutex);
ctx.done++;
output_lock.unlock();
ctx.output_cv.notify_all();
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static std::string static std::string
print_shape(const Ort::ConstTensorTypeAndShapeInfo &info) print_shape(const Ort::ConstTensorTypeAndShapeInfo &info)
{ {
@ -533,30 +593,34 @@ infer(Ort::Env &env, const char *path, const std::vector<std::string> &images)
return; return;
} }
// TODO: Image loading is heavily parallelizable. In theory. // By only parallelizing image loads here during batching,
std::vector<Magick::Image> batch; // they never compete for CPU time with inference.
for (const auto &filename : images) { Thumbnailing ctx;
Magick::Image image; for (const auto &path : images)
try { ctx.input.push(path);
image = load(filename, config, *width, *height); for (auto i = g.batch; i--; )
} catch (const std::exception &e) { std::thread(thumbnail, std::ref(config), *width, *height,
fprintf(stderr, "%s: %s\n", filename.c_str(), e.what()); std::ref(ctx)).detach();
continue;
}
if (*height != image.rows() || *width != image.columns()) { while (true) {
fprintf(stderr, "%s: %s\n", filename.c_str(), "tensor mismatch"); std::unique_lock<std::mutex> input_lock(ctx.input_mutex);
continue; ctx.work = g.batch;
} input_lock.unlock();
ctx.input_cv.notify_all();
batch.push_back(image); std::unique_lock<std::mutex> output_lock(ctx.output_mutex);
if (batch.size() == g.batch) { ctx.output_cv.wait(output_lock,
run(batch, config, session, shape); [&]{ return ctx.output.size() == g.batch || ctx.done == g.batch; });
batch.clear();
// It would be possible to add dummy entries to the batch,
// so that the model doesn't need to be rebuilt.
if (!ctx.output.empty()) {
run(ctx.output, config, session, shape);
ctx.output.clear();
} }
if (ctx.done == g.batch)
break;
} }
if (!batch.empty())
run(batch, config, session, shape);
} }
int int
@ -649,14 +713,19 @@ main(int argc, char *argv[])
paths.assign(argv + 1, argv + argc); paths.assign(argv + 1, argv + argc);
} }
// Load batched images in parallel (the first is for GM, the other for IM).
if (g.batch > 1) {
auto value = std::to_string(
std::max(std::thread::hardware_concurrency() / g.batch, 1L));
setenv("OMP_NUM_THREADS", value.c_str(), true);
setenv("MAGICK_THREAD_LIMIT", value.c_str(), true);
}
// XXX: GraphicsMagick initializes signal handlers here, // XXX: GraphicsMagick initializes signal handlers here,
// one needs to use MagickLib::InitializeMagickEx() // one needs to use MagickLib::InitializeMagickEx()
// with MAGICK_OPT_NO_SIGNAL_HANDER to prevent that. // with MAGICK_OPT_NO_SIGNAL_HANDER to prevent that.
// //
// ImageMagick conveniently has the opposite default. // ImageMagick conveniently has the opposite default.
//
// Once processing images in parallel, consider presetting
// OMP_NUM_THREADS=1 (GM) and/or MAGICK_THREAD_LIMIT=1 (IM).
Magick::InitializeMagick(nullptr); Magick::InitializeMagick(nullptr);
OrtLoggingLevel logging = g.debug > 1 OrtLoggingLevel logging = g.debug > 1