|
import csv |
|
import time |
|
import pickle |
|
import os.path |
|
from googleapiclient.discovery import build |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from google.auth.transport.requests import Request |
|
|
|
CONTACT_CSV_FILE = 'googlecontacts.csv' |
|
# If modifying these scopes, delete the file token.pickle. |
|
SCOPES = ['https://www.googleapis.com/auth/contacts'] |
|
CLIENT_SECRET_FILE = 'credentials.json' |
|
|
|
class ImportContacts(object): |
|
def read_contacts(self, file_name): |
|
# Import the csv file of the address book and convert it into a list |
|
with open(file_name, 'r') as f: |
|
csv_reader = csv.reader(f) |
|
csv_header = next(csv_reader) # Skip the header line |
|
csv_contacts = [] |
|
for csv_contact in csv_reader: |
|
csv_contacts.append(csv_contact) |
|
return csv_contacts |
|
|
|
def print_all_contacts(self, file_name): |
|
csv_contacts = self.read_contacts(file_name) |
|
for i, csv_contact in enumerate(csv_contacts): |
|
print('no.%s %s' % (i+1, csv_contact)) |
|
|
|
class QuickstartMod(object): |
|
def __init__(self): |
|
creds = None |
|
# The file token.pickle stores the user's access and refresh tokens, and is |
|
# created automatically when the authorization flow completes for the first time. |
|
if os.path.exists('token.pickle'): |
|
with open('token.pickle', 'rb') as token: |
|
creds = pickle.load(token) |
|
# If there are no (valid) credentials available, let the user log in. |
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
creds.refresh(Request()) |
|
else: |
|
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) |
|
creds = flow.run_local_server(port=0) |
|
# Save the credentials for the next run |
|
with open('token.pickle', 'wb') as token: |
|
pickle.dump(creds, token) |
|
creds = creds |
|
self.service = build('people', 'v1', credentials=creds) |
|
|
|
def get_all_contacts(self): |
|
# Keep getting 1000 connections until the nextPageToken becomes None |
|
connections_list = [] |
|
next_page_token = '' |
|
while True: |
|
if not (next_page_token is None): |
|
# Call the People API |
|
results = self.service.people().connections().list( |
|
resourceName = 'people/me', |
|
pageSize = 1000, |
|
personFields = 'names,emailAddresses', |
|
pageToken = next_page_token |
|
).execute() |
|
connections_list = connections_list + results.get('connections', []) |
|
next_page_token = results.get('nextPageToken') |
|
else: |
|
break |
|
return connections_list |
|
|
|
def print_all_contacts(self): |
|
connections_list = self.get_all_contacts() |
|
if connections_list: |
|
for i, person in enumerate(connections_list): |
|
resource_name = person.get('resourceName', []) |
|
names = person.get('names', []) |
|
if names: |
|
display_name = names[0].get('displayName') |
|
else: |
|
display_name = None |
|
print('no.%s %s %s' % (i+1, display_name, resource_name)) |
|
else: |
|
print('No connections found.') |
|
|
|
def create_contact_body(self, contact): |
|
given_name = contact[1] |
|
middle_name = contact[2] |
|
family_name = contact[3] |
|
notes = contact[25] |
|
email_work = contact[30] |
|
email_home = contact[32] |
|
phone_mobile = contact[34] |
|
phone_work = contact[36] |
|
phone_home = contact[38] |
|
address_home = contact[40] # The unstructured value of the address |
|
p_code_home = contact[45] |
|
address_work = contact[50] # The unstructured value of the address |
|
p_code_work = contact[55] |
|
org_name = contact[58] |
|
org_title = contact[60] |
|
department = contact[61] |
|
record_id = contact[65] # the user defined data, intended to be used for synchronization |
|
modified_day = contact[66] # the user defined data, intended to be used for synchronization |
|
|
|
contact_body = { |
|
'names':[{'givenName':given_name, 'familyName':family_name, 'middleName':middle_name,}], |
|
'emailAddresses':[{'value':email_work, 'type':'work'}, {'value':email_home, 'type':'home'}], |
|
'phoneNumbers':[{'value':phone_mobile, 'type':'mobile'}, |
|
{'value':phone_home, 'type':'home'}, |
|
{'value':phone_work, 'type':'work'}], |
|
'addresses':[{'streetAddress':address_home, 'postalCode':p_code_home, 'type':'home'}, |
|
{'streetAddress':address_work, 'postalCode':p_code_work, 'type':'work'}], |
|
'organizations':[{'name':org_name, 'title':org_title}, {'department':department}], |
|
'biographies':[{'value':notes}], |
|
'userDefined':[{'key':record_id, 'value':modified_day}] |
|
} |
|
return contact_body |
|
|
|
# Upload contacts one by one. |
|
def add_contact(self, contact): |
|
try: |
|
contact_body = self.create_contact_body(contact) |
|
new_contact = self.service.people().createContact(body=contact_body).execute() |
|
print('Uploaded id:%s %s %s' % (contact[65], contact[1], contact[3])) |
|
except IndexError: |
|
print('Error id:%s %s %s contains input errors' % (contact[65], contact[1], contact[3])) |
|
pass |
|
|
|
def create_contact_group(self, group_name): |
|
results = self.service.contactGroups().create(body={'contactGroup':{'name':group_name}}).execute() |
|
group_id = results.get('resourceName', []) |
|
return group_id |
|
|
|
def delete_all_contacts(self): |
|
group_id = self.create_contact_group('temp') |
|
print('making a temporary group to delete...') |
|
time.sleep(5) |
|
print('waiting ...') |
|
time.sleep(5) |
|
connections = self.get_all_contacts() |
|
if connections: |
|
for i, person in enumerate(connections): |
|
resource_name = person.get('resourceName', []) |
|
add_group = self.service.contactGroups().members().modify( |
|
resourceName = group_id, |
|
body = {'resourceNamesToAdd':[resource_name]}).execute() |
|
print(resource_name +' moved to the temporary group') |
|
else: |
|
print('No connections found.') |
|
self.service.contactGroups().delete(resourceName=group_id,deleteContacts=True).execute() |
|
print('All contacts have been deleted.') |
|
|
|
def main(): |
|
read_contacts = ImportContacts() |
|
contacts_list = read_contacts.read_contacts(CONTACT_CSV_FILE) |
|
sync_contacts = QuickstartMod() |
|
data = [] |
|
for contact in contacts_list: |
|
data.append(sync_contacts.add_contact(contact)) |
|
#sync_contacts.delete_all_contacts() |
|
|
|
if __name__ == '__main__': |
|
main() |