Skip to content
Snippets Groups Projects
preprocessing_pipeline_test.py 14.7 KiB
Newer Older
Daniel Ecer's avatar
Daniel Ecer committed
from contextlib import contextmanager
import logging
Daniel Ecer's avatar
Daniel Ecer committed
from mock import Mock, patch, DEFAULT
Daniel Ecer's avatar
Daniel Ecer committed

import pytest

import apache_beam as beam

from sciencebeam_gym.utils.collection import (
  extend_dict
)

from sciencebeam_gym.beam_utils.utils import (
  TransformAndLog
)

from sciencebeam_gym.beam_utils.testing import (
  BeamTest,
  TestPipeline,
  get_current_test_context,
  get_counter_value
Daniel Ecer's avatar
Daniel Ecer committed
)

from sciencebeam_gym.preprocess.preprocessing_pipeline import (
  parse_args,
  configure_pipeline,
  MetricCounters
Daniel Ecer's avatar
Daniel Ecer committed
)

PREPROCESSING_PIPELINE = 'sciencebeam_gym.preprocess.preprocessing_pipeline'

BASE_DATA_PATH = 'base'
PDF_PATH = '*/*.pdf'
XML_PATH = '*/*.xml'

PDF_FILE_1 = '1/file.pdf'
XML_FILE_1 = '1/file.xml'
PDF_FILE_2 = '2/file.pdf'
XML_FILE_2 = '2/file.xml'
PDF_XML_FILE_LIST_FILE_1 = 'pdf-xml-files.tsv'

def get_logger():
  return logging.getLogger(__name__)

def fake_content(path):
  return 'fake content: %s' % path

def fake_lxml_for_pdf(pdf, path, page_range=None):
  return 'fake lxml for pdf: %s (%s) [%s]' % (pdf, path, page_range)
Daniel Ecer's avatar
Daniel Ecer committed

fake_svg_page = lambda i=0: 'fake svg page: %d' % i
fake_pdf_png_page = lambda i=0: 'fake pdf png page: %d' % i
fake_block_png_page = lambda i=0: 'fake block png page: %d' % i

def get_global_tfrecords_mock():
  # workaround for mock that would get serialized/deserialized before being invoked
  return get_current_test_context().tfrecords_mock

@contextmanager
def patch_preprocessing_pipeline(**kwargs):
  always_mock = {
    'find_file_pairs_grouped_by_parent_directory_or_name',
    'read_all_from_path',
Daniel Ecer's avatar
Daniel Ecer committed
    'pdf_bytes_to_png_pages',
    'convert_pdf_bytes_to_lxml',
Daniel Ecer's avatar
Daniel Ecer committed
    'convert_and_annotate_lxml_content',
    'svg_page_to_blockified_png_bytes',
    'save_svg_roots',
    'save_pages',
    'evaluate_document_by_page',
    'ReadDictCsv'
  }
  tfrecords_mock = Mock(name='tfrecords_mock')

  def DummyWritePropsToTFRecord(file_path, extract_props):
    return TransformAndLog(beam.Map(
      lambda v: tfrecords_mock(file_path, list(extract_props(v)))
    ), log_fn=lambda x: get_logger().info('tfrecords: %s', x))
Daniel Ecer's avatar
Daniel Ecer committed

  with patch.multiple(
    PREPROCESSING_PIPELINE,
    WritePropsToTFRecord=DummyWritePropsToTFRecord,
    **{
      k: kwargs.get(k, DEFAULT)
      for k in always_mock
    }
  ) as mocks:
    get_current_test_context().mocks = mocks
    mocks['read_all_from_path'].side_effect = fake_content
    mocks['convert_pdf_bytes_to_lxml'].side_effect = fake_lxml_for_pdf
Daniel Ecer's avatar
Daniel Ecer committed
    yield extend_dict(
      mocks,
      {'tfrecords': tfrecords_mock}
    )

MIN_ARGV = [
  '--data-path=' + BASE_DATA_PATH,
  '--pdf-path=' + PDF_PATH,
  '--xml-path=' + XML_PATH,
  '--save-svg'
]

Daniel Ecer's avatar
Daniel Ecer committed
def get_default_args():
  return parse_args([
    '--data-path=' + BASE_DATA_PATH,
    '--pdf-path=' + PDF_PATH,
    '--xml-path=' + XML_PATH,
    '--save-svg'
  ])

def page_uri_suffix(page_no):
  return '#page%d' % page_no

Daniel Ecer's avatar
Daniel Ecer committed
def _expected_tfrecord_props(pdf_file, page_no=1):
  return {
    'input_uri': pdf_file + page_uri_suffix(page_no),
    'annotation_uri': pdf_file + '.annot' + page_uri_suffix(page_no),
Daniel Ecer's avatar
Daniel Ecer committed
    'input_image': fake_pdf_png_page(page_no),
    'annotation_image': fake_block_png_page(page_no),
    'page_no': page_no
Daniel Ecer's avatar
Daniel Ecer committed
  }

def _setup_mocks_for_pages(mocks, page_no_list, file_count=1):
  mocks['convert_and_annotate_lxml_content'].return_value = [
    fake_svg_page(i) for i in page_no_list
  ]
  mocks['pdf_bytes_to_png_pages'].return_value = [
    fake_pdf_png_page(i) for i in page_no_list
  ]
  mocks['svg_page_to_blockified_png_bytes'].side_effect = [
    fake_block_png_page(i)
    for _ in range(file_count)
    for i in page_no_list
  ]

Daniel Ecer's avatar
Daniel Ecer committed
@pytest.mark.slow
Daniel Ecer's avatar
Daniel Ecer committed
class TestConfigurePipeline(BeamTest):
  def test_should_pass_pdf_and_xml_patterns_to_find_file_pairs_grouped_by_parent_directory(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.base_data_path = 'base'
      opt.pdf_path = 'pdf'
      opt.xml_path = 'xml'
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = []
        configure_pipeline(p, opt)

      mocks['find_file_pairs_grouped_by_parent_directory_or_name'].assert_called_with(
        ['base/pdf', 'base/xml']
      )

  def test_should_pass_lxml_and_xml_patterns_to_find_file_pairs_grouped_by_parent_directory(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.base_data_path = 'base'
      opt.pdf_path = ''
      opt.lxml_path = 'lxml'
      opt.xml_path = 'xml'
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = []
        configure_pipeline(p, opt)

      mocks['find_file_pairs_grouped_by_parent_directory_or_name'].assert_called_with(
        ['base/lxml', 'base/xml']
      )

  def test_should_write_tfrecords_from_pdf_xml_file_list(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.pdf_path = None
      opt.xml_path = None
      opt.pdf_xml_file_list = '.temp/file-list.tsv'
      opt.save_tfrecords = True
      with TestPipeline() as p:
        mocks['ReadDictCsv'].return_value = beam.Create([{
          'source_url': PDF_FILE_1,
Daniel Ecer's avatar
Daniel Ecer committed
          'xml_url': XML_FILE_1
        }])
        _setup_mocks_for_pages(mocks, [1])
        configure_pipeline(p, opt)

      mocks['ReadDictCsv'].assert_called_with(opt.pdf_xml_file_list, limit=None)
      mocks['tfrecords'].assert_called_with(opt.output_path + '/data', [
        _expected_tfrecord_props(PDF_FILE_1)
      ])

  def test_should_write_multiple_tfrecords_from_pdf_xml_file_list(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.pdf_path = None
      opt.xml_path = None
      opt.pdf_xml_file_list = '.temp/file-list.tsv'
      opt.save_tfrecords = True
      with TestPipeline() as p:
        mocks['ReadDictCsv'].return_value = beam.Create([{
          'source_url': PDF_FILE_1,
Daniel Ecer's avatar
Daniel Ecer committed
          'xml_url': XML_FILE_1
        }, {
          'source_url': PDF_FILE_2,
Daniel Ecer's avatar
Daniel Ecer committed
          'xml_url': XML_FILE_2
        }])
        _setup_mocks_for_pages(mocks, [1], file_count=2)
        configure_pipeline(p, opt)

      mocks['ReadDictCsv'].assert_called_with(opt.pdf_xml_file_list, limit=None)
      for pdf_file in [PDF_FILE_1, PDF_FILE_2]:
        mocks['tfrecords'].assert_any_call(opt.output_path + '/data', [
          _expected_tfrecord_props(pdf_file)
        ])
      assert mocks['tfrecords'].call_count == 2

  def test_should_pass_limit_to_read_dict_csv(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.pdf_path = None
      opt.xml_path = None
      opt.pdf_xml_file_list = '.temp/file-list.tsv'
      opt.limit = 1
      opt.save_tfrecords = True
      with TestPipeline() as p:
        mocks['ReadDictCsv'].return_value = beam.Create([{
          'source_url': PDF_FILE_1,
Daniel Ecer's avatar
Daniel Ecer committed
          'xml_url': XML_FILE_1
        }])
        _setup_mocks_for_pages(mocks, [1])
        configure_pipeline(p, opt)

      mocks['ReadDictCsv'].assert_called_with(opt.pdf_xml_file_list, limit=opt.limit)
      assert mocks['tfrecords'].call_count == 1

  def test_should_pass_limit_to_find_file_pairs_grouped_by_parent_directory_or_name(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.base_data_path = 'base'
      opt.pdf_path = ''
      opt.lxml_path = 'lxml'
      opt.xml_path = 'xml'
      opt.save_tfrecords = True
      opt.limit = 1
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = [
          (PDF_FILE_1, XML_FILE_1),
          (PDF_FILE_2, XML_FILE_2)
        ]
        configure_pipeline(p, opt)

      mocks['tfrecords'].call_count == 1

  def test_should_write_tfrecords_from_pdf_xml_path(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.save_tfrecords = True
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = [
          (PDF_FILE_1, XML_FILE_1)
        ]
        _setup_mocks_for_pages(mocks, [1])
        configure_pipeline(p, opt)

      mocks['tfrecords'].assert_called_with(opt.output_path + '/data', [
        _expected_tfrecord_props(PDF_FILE_1)
      ])

  def test_should_write_multiple_tfrecords_and_count_pages(self):
Daniel Ecer's avatar
Daniel Ecer committed
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.save_tfrecords = True
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = [
          (PDF_FILE_1, XML_FILE_1)
        ]
        _setup_mocks_for_pages(mocks, [1, 2])
        configure_pipeline(p, opt)

        p_result = p.run()
        assert get_counter_value(p_result, MetricCounters.FILE_PAIR) == 1
        assert get_counter_value(p_result, MetricCounters.PAGE) == 2
        assert get_counter_value(p_result, MetricCounters.FILTERED_PAGE) is None

Daniel Ecer's avatar
Daniel Ecer committed
      mocks['tfrecords'].assert_called_with(opt.output_path + '/data', [
        _expected_tfrecord_props(PDF_FILE_1, page_no=i)
        for i in [1, 2]
      ])

  def test_should_not_write_tfrecord_below_annotation_threshold_and_count_pages(self):
Daniel Ecer's avatar
Daniel Ecer committed
    custom_mocks = dict(
      evaluate_document_by_page=lambda _: [{
        'percentage': {
          # low percentage of None (no annotation, include)
          None: 0.1
        }
      }, {
        'percentage': {
          # low percentage of None (no annotation, exclude)
          None: 0.9
        }
      }]
    )
    with patch_preprocessing_pipeline(**custom_mocks) as mocks:
      opt = get_default_args()
      opt.save_tfrecords = True
      opt.min_annotation_percentage = 0.5
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = [
          (PDF_FILE_1, XML_FILE_1)
        ]
        _setup_mocks_for_pages(mocks, [1, 2])
        configure_pipeline(p, opt)

        p_result = p.run()
        assert get_counter_value(p_result, MetricCounters.FILE_PAIR) == 1
        assert get_counter_value(p_result, MetricCounters.PAGE) == 2
        assert get_counter_value(p_result, MetricCounters.FILTERED_PAGE) == 1

Daniel Ecer's avatar
Daniel Ecer committed
      mocks['tfrecords'].assert_called_with(opt.output_path + '/data', [
        _expected_tfrecord_props(PDF_FILE_1, page_no=i)
        for i in [1]
      ])

  def test_should_only_process_selected_pages(self):
    with patch_preprocessing_pipeline() as mocks:
      opt = get_default_args()
      opt.save_tfrecords = True
      opt.save_png = True
      opt.pages = (1, 3)
      with TestPipeline() as p:
        mocks['find_file_pairs_grouped_by_parent_directory_or_name'].return_value = [
          (PDF_FILE_1, XML_FILE_1)
        ]
        _setup_mocks_for_pages(mocks, [1, 2])
        configure_pipeline(p, opt)

      assert mocks['convert_pdf_bytes_to_lxml'].called
      assert mocks['convert_pdf_bytes_to_lxml'].call_args[1].get('page_range') == opt.pages

      assert mocks['pdf_bytes_to_png_pages'].called
      assert mocks['pdf_bytes_to_png_pages'].call_args[1].get('page_range') == opt.pages

Daniel Ecer's avatar
Daniel Ecer committed
class TestParseArgs(object):
  def test_should_raise_error_without_arguments(self):
    with pytest.raises(SystemExit):
      parse_args([])

  def test_should_not_raise_error_with_minimum_arguments(self):
    parse_args(['--data-path=test', '--pdf-path=test', '--xml-path=test', '--save-svg'])

  def test_should_not_raise_error_with_lxml_path_instead_of_pdf_path(self):
    parse_args(['--data-path=test', '--lxml-path=test', '--xml-path=test', '--save-svg'])

  def test_should_raise_error_if_no_output_option_specified(self):
    with pytest.raises(SystemExit):
      parse_args(['--data-path=test', '--pdf-path=test', '--xml-path=test'])

  def test_should_raise_error_if_pdf_and_lxml_path_are_specified(self):
    with pytest.raises(SystemExit):
      parse_args([
        '--data-path=test', '--pdf-path=test', '--lxml-path=test', '--xml-path=test',
        '--save-svg'
      ])

  def test_should_raise_error_if_pdf_path_specified_without_xml_path(self):
    with pytest.raises(SystemExit):
      parse_args(['--data-path=test', '--pdf-path=test', '--save-svg'])

  def test_should_not_raise_error_if_pdf_xml_file_list_specified_without_xml_path(self):
    parse_args(['--data-path=test', '--pdf-xml-file-list=test', '--save-svg'])

  def test_should_not_raise_error_with_save_lxml_path_together_with_pdf_path(self):
    parse_args(['--data-path=test', '--pdf-path=test', '--save-lxml', '--xml-path=test'])

  def test_should_not_raise_error_with_save_lxml_path_together_with_pdf_xml_file_list(self):
    parse_args(['--data-path=test', '--pdf-xml-file-list=test', '--save-lxml', '--xml-path=test'])

  def test_should_raise_error_if_save_lxml_specified_without_pdf_path(self):
    with pytest.raises(SystemExit):
      parse_args(['--data-path=test', '--lxml-path=test', '--save-lxml', '--xml-path=test'])

  def test_should_raise_error_if_save_png_is_specified_without_pdf_path(self):
    with pytest.raises(SystemExit):
      parse_args(['--data-path=test', '--lxml-path=test', '--save-png', '--xml-path=test'])

  def test_should_not_raise_error_with_save_png_path_together_with_pdf_path(self):
    parse_args(['--data-path=test', '--pdf-path=test', '--save-png', '--xml-path=test'])

  def test_should_not_raise_error_with_save_png_path_together_with_pdf_xml_file_list(self):
    parse_args(['--data-path=test', '--pdf-xml-file-list=test', '--save-png', '--xml-path=test'])

  def test_should_raise_error_if_image_width_was_specified_without_image_height(self):
    with pytest.raises(SystemExit):
      parse_args([
        '--data-path=test', '--pdf-path=test', '--xml-path=test',
        '--save-png', '--image-width=100'
      ])

  def test_should_raise_error_if_image_height_was_specified_without_image_width(self):
    with pytest.raises(SystemExit):
      parse_args([
        '--data-path=test', '--pdf-path=test', '--xml-path=test',
        '--save-png', '--image-height=100'
      ])

  def test_should_not_raise_error_if_both_image_width_and_height_are_specified(self):
    parse_args([
      '--data-path=test', '--pdf-path=test', '--xml-path=test',
      '--save-png', '--image-width=100', '--image-height=100'
    ])

  def test_should_raise_error_if_save_tfrecords_specified_without_pdf_path(self):
    with pytest.raises(SystemExit):
      parse_args(['--data-path=test', '--lxml-path=test', '--xml-path=test', '--save-tfrecords'])

  def test_should_not_raise_error_if_save_tfrecords_specified_with_pdf_path(self):
    parse_args(['--data-path=test', '--pdf-path=test', '--xml-path=test', '--save-tfrecords'])

  def test_should_not_raise_error_if_save_tfrecords_specified_with_pdf_xml_file_list(self):
    parse_args([
      '--data-path=test', '--pdf-xml-file-list=test', '--xml-path=test', '--save-tfrecords'
    ])

  def test_should_have_none_page_range_by_default(self):
    assert parse_args(MIN_ARGV).pages is None

  def test_should_parse_pages_as_list(self):
    assert parse_args(MIN_ARGV + ['--pages=1-3']).pages == (1, 3)