sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py (63 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Integration tests for Recommendations AI transforms.""" from __future__ import absolute_import import random import unittest from nose.plugins.attrib import attr import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.testing.util import is_not_empty # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from google.cloud import recommendationengine from apache_beam.ml.gcp import recommendations_ai except ImportError: recommendationengine = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports def extract_id(response): yield response["id"] def extract_event_type(response): yield response["event_type"] def extract_prediction(response): yield response[0]["results"] @attr('IT') @unittest.skipIf( recommendationengine is None, "Recommendations AI dependencies not installed.") class RecommendationAIIT(unittest.TestCase): def test_create_catalog_item(self): CATALOG_ITEM = { "id": str(int(random.randrange(100000))), "title": "Sample laptop", "description": "Indisputably the most fantastic laptop ever created.", "language_code": "en", "category_hierarchies": [{ "categories": ["Electronic", "Computers"] }] } with TestPipeline(is_integration_test=True) as p: output = ( p | 'Create data' >> beam.Create([CATALOG_ITEM]) | 'Create CatalogItem' >> recommendations_ai.CreateCatalogItem(project=p.get_option('project')) | beam.ParDo(extract_id) | beam.combiners.ToList()) assert_that(output, equal_to([[CATALOG_ITEM["id"]]])) def test_create_user_event(self): USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}} with TestPipeline(is_integration_test=True) as p: output = ( p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >> recommendations_ai.WriteUserEvent(project=p.get_option('project')) | beam.ParDo(extract_event_type) | beam.combiners.ToList()) assert_that(output, equal_to([[USER_EVENT["event_type"]]])) def test_create_predict(self): USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}} with TestPipeline(is_integration_test=True) as p: output = ( p | 'Create data' >> beam.Create([USER_EVENT]) | 'Predict UserEvent' >> recommendations_ai.PredictUserEvent( project=p.get_option('project'), placement_id="recently_viewed_default") | beam.ParDo(extract_prediction)) assert_that(output, is_not_empty()) if __name__ == '__main__': print(recommendationengine.CatalogItem.__module__) unittest.main()