libs/google/drive.py (99 lines of code) (raw):

import json from google.controller import GoogleApiController class GoogleDriveApi(GoogleApiController): def __init__(self, oauth): self.oauth = oauth self.service = self._get_service("drive", "v3") def get_files_list(self, owner): """ Retrieves a user's drive files :return: list of files owned by the user. """ files_list = [] try: r = json.loads(self.call_google_api(service=self.service, q=owner, api_resource="files", api_method="list", response_field=None, corpus="user", spaces="drive")) files_list.extend(r["files"]) if "nextPageToken" in r: next_page_token = r["nextPageToken"] while next_page_token is not None: r = json.loads(self.call_google_api(service=self.service, q=owner, api_resource="files", api_method="list", response_field=None, corpus="user", spaces="drive", pageToken=next_page_token)) files_list.extend(r["files"]) if "nextPageToken" in r: next_page_token = r["nextPageToken"] else: next_page_token = None return files_list except(ValueError, KeyError, TypeError): return None def search_files_list(self, owner="'me' in owners", drive_query=""): """ Searches users drives files :param drive_query: q :param owner: q :return: list of files with name containing the searched term. """ files_list = [] try: file_search = "%s and name contains '%s'" % (owner, drive_query) r = json.loads(self.call_google_api(service=self.service, q=file_search, api_resource="files", api_method="list", response_field=None, corpus="user", spaces="drive")) files_list.extend(r["files"]) if "nextPageToken" in r: next_page_token = r["nextPageToken"] while next_page_token is not None: r = json.loads(self.call_google_api(service=self.service, q=file_search, api_resource="files", api_method="list", response_field=None, corpus="user", spaces="drive", pageToken=next_page_token)) files_list.extend(r["files"]) if "nextPageToken" in r: next_page_token = r["nextPageToken"] else: next_page_token = None return files_list except(ValueError, KeyError, TypeError): return None def get_file_info(self, file_id): """ Lists a file's metadata. :param file_id: fileId :return: files resource """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="files", api_method="get", fields="owners", fileId=file_id, response_field=None)) return r except(ValueError, KeyError, TypeError): return None def transfer_file_owner(self, file_id, user_email): """ Assigned new owner for the file :param file_id: fileId :param user_email: permissionId :return: bool """ permissions_settings = { "kind": "drive#permission", "role": "owner", "type": "user", "emailAddress": user_email } try: r = json.loads(self.call_google_api(service=self.service, api_resource="permissions", api_method="create", response_field="role", fileId=file_id, transferOwnership=True, body=permissions_settings)) if r == "owner": return True else: return False except(ValueError, KeyError, TypeError): return False