# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from google import protobuf
from gcloud import bigtable
_COLUMN_FAMILY_ID = 'cf1'
class BigTableClient(object):
"""Defines the big table client that connects to the big table.
Attributes:
_column_family_id: A String for family of columns.
_client: An instance of Client which is project specific.
_client_instance: Representation of a Google Cloud Bigtable Instance.
_start_index: Start index for the row key. It gets incremented as we
dequeue.
_end_index : End index for row key. This is incremented as we Enqueue.
_table_name: A string that represents the big table.
_table_instance: An instance of the Table that represents the big table.
"""
def __init__(self, table, project_id):
self._column_family_id = _COLUMN_FAMILY_ID
self._client = bigtable.Client(project=project_id, admin=True)
self._client_instance = None
self._start_index = 0
self._end_index = 0
self._table_name = table
self._table_instance = None
# Start client to enable receiving requests
self.StartClient()
def StartClient(self, instance_id):
"""Starts client to prepare it to make requests."""
# Start the client
if not self._client.is_started():
self._client.start()
self._client_instance = self._client.instance(instance_id)
if self._table_instance is None:
self._table_instance = self._client_instance.table(self._table_name)
def StopClient(self):
"""Stop client to close all the open gRPC clients."""
# stop client
self._client.stop()
def CreateTable(self):
"""Creates a table in which read/write operations are performed.
Raises:
AbortionError: Error occurred when creating table is not successful.
This could be due to creating a table with a duplicate name.
"""
# Create a table
logging.debug('Creating the table %s', self._table_name)
self._table_instance.create()
cf1 = self._table_instance.column_family(self._column_family_id)
cf1.create()
def Enqueue(self, messages, column_id):
"""Writes new rows to the given table.
Args:
messages: An array of strings that represents the message to be
written to a new row in the table. Each message is writte to a
new row
column_id: A string that represents the name of the column to which
data is to be written.
"""
# Start writing rows
logging.debug('Writing to the table : %s, column : %s', self._table_name,
column_id)
for value in messages:
row_key = str(self._end_index)
self._end_index = self._end_index + 1
row = self._table_instance.row(row_key)
row.set_cell(self._column_family_id, column_id.encode('utf-8'),
value.encode('utf-8'))
row.commit()
# End writing rows
def Dequeue(self):
"""Removes and returns the first row from the table.
Returns:
row: A row object that represents the top most row.
"""
if self._end_index < self._start_index:
return
logging.info('Getting a single row by row key.')
key = str(self._start_index)
row_cond = self._table_instance.row(key)
top_row = row_cond
row_cond.delete()
self._start_index = self._start_index + 1
return top_row
def DeleteTable(self):
"""Performs delete operation for a given table."""
# Delete the table
logging.debug('Deleting the table : %s', self._table_name)
self._table_instance.delete()