Я пытаюсь добавить параллельный загрузчик данных в torch-dataframe, чтобы добавить совместимость с torchnet. Я использовал tnt.ParallelDatasetIterator и изменил так, что:
- Базовый пакет загружается вне потоков
- Пакет сериализуется и отправляется в поток
- В потоке пакет десериализуется и преобразует пакетные данные в тензоры
- Тензоры возвращаются в таблице с ключами
input
иtarget
для соответствия tnt.Engine.
Проблема возникает при втором вызове enque
с ошибкой: .../torch_distro/install/bin/luajit: not enough memory
. В настоящее время я работаю только с mnist с адаптированным mnist-example. Цикл enque
теперь выглядит так (с отладочным выводом памяти):
-- `samplePlaceholder` stands in for samples which have been
-- filtered out by the `filter` function
local samplePlaceholder = {}
-- The enque does the main loop
local idx = 1
local function enqueue()
while idx <= size and threads:acceptsjob() do
local batch, reset = self.dataset:get_batch(batch_size)
if (reset) then
idx = size + 1
else
idx = idx + 1
end
if (batch) then
local serialized_batch = torch.serialize(batch)
-- In the parallel section only the to_tensor is run in parallel
-- this should though be the computationally expensive operation
threads:addjob(
function(argList)
io.stderr:write("\n Start");
io.stderr:write("\n 1: " ..tostring(collectgarbage("count")))
local origIdx, serialized_batch, samplePlaceholder = unpack(argList)
io.stderr:write("\n 2: " ..tostring(collectgarbage("count")))
local batch = torch.deserialize(serialized_batch)
serialized_batch = nil
collectgarbage()
collectgarbage()
io.stderr:write("\n 3: " .. tostring(collectgarbage("count")))
batch = transform(batch)
io.stderr:write("\n 4: " .. tostring(collectgarbage("count")))
local sample = samplePlaceholder
if (filter(batch)) then
sample = {}
sample.input, sample.target = batch:to_tensor()
end
io.stderr:write("\n 5: " ..tostring(collectgarbage("count")))
collectgarbage()
collectgarbage()
io.stderr:write("\n 6: " ..tostring(collectgarbage("count")))
io.stderr:write("\n End \n");
return {
sample,
origIdx
}
end,
function(argList)
sample, sampleOrigIdx = unpack(argList)
end,
{idx, serialized_batch, samplePlaceholder}
)
end
end
end
Я побрызгал collectgarbage
, а также попытался удалить все ненужные объекты. Вывод памяти довольно прост:
Start
1: 374840.87695312
2: 374840.94433594
3: 372023.79101562
4: 372023.85839844
5: 372075.41308594
6: 372023.73632812
End
Функция, которая зацикливает enque
, является тривиальной неупорядоченной функцией (ошибка памяти возникает во втором enque
и ):
iterFunction = function()
while threads:hasjob() do
enqueue()
threads:dojob()
if threads:haserror() then
threads:synchronize()
end
enqueue()
if table.exact_length(sample) > 0 then
return sample
end
end
end