Reference for the technical implementation of the comms.RedisComm project code.

RedisComm

Redis class used to communicate local insights to Redis running on manager node

Attributes:

Name Type Description
scarletName string

name of communication primitive

address : string a unique address for identifying this node

identity_config : dict dictionary current node's identity read through the identity file.

contract : Contract the contract object that handles communication with the SmartContracts

Methods

  • _verifyScarletParameters() Verifies whether scarlet parameters match

  • loadContract() Loads contract details from remote DB

  • Pull(modelLocal, key="0x0", calcWD=False, average=False) Pull the global model from chain and update the local model

  • Push( modelLocal, key="0x0", wait4Tx=None) Push the local model to the chain.

Source code in comms/RedisComm.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class RedisComm:
    """
    Redis class used to communicate local insights to Redis running on manager node

    Attributes
    ----------

    scarletName : string
        name of communication primitive

    address : string
        a unique address for identifying this node

    identity_config : dict
        dictionary current node's identity read through the identity file.

    contract : Contract
        the contract object that handles communication with the SmartContracts

    Methods
    -------

    * `_verifyScarletParameters()`
        Verifies whether scarlet parameters match

    * `loadContract()`
        Loads contract details from remote DB

    * `Pull(modelLocal, key="0x0", calcWD=False, average=False)`
        Pull the global model from chain and update the local model

    * `Push( modelLocal, key="0x0", wait4Tx=None)`
        Push the local model to the chain.

    """

    def __init__(self, scarletName):

        self.scarletName = scarletName
        self.contract = None
        self.address = ""

        self.identity_config = self.readIdentityFile()
        self.address = self.identity_config["address"]

    def readIdentityFile(self):

        id_file = "/tmp/identity.yaml"
        if "ID_FILE" in os.environ.keys():
            id_file = os.environ["ID_FILE"]

        stream = open(id_file, "r")
        try:
            identity_config = yaml.safe_load(stream)
            return identity_config

        except yaml.YAMLError as exception:
            click.echo(
                click.style(
                    "failure opening identity file : {} with exception: {}".format(
                        config_file, exception
                    ),
                    fg="red",
                )
            )
            return



    def loadContract(self):

        # initialize contract
        self.contract = RedisContract(
            self.scarletName
        )

    def Pull(
        self,
        modelLocal,
        key="0x0"
    ):

        """
        Pull the global model from chain and update the local model.

        Parameters
        ----------
        modelLocal : numpy array
            A unidimensional numpy array representing the local estimate
        key: string
            Used as key for Mapper
        calcWD : bool
            Boolean indicating whether to calculate weight difference with the global model
        average : bool
            Boolean indicating whether to average the global model with the local model or not

        Returns
        -------
        modelOut:
            The updated model
        numUpdatedChunks:
            The number of chunks which were successfully pulled from global model

        """

        val = self.contract.checkChunkExists(key, 0)
        if val:

            modelBytes = self.contract.getChunk(key, 0)

            modelBytes = zlib.decompress(modelBytes)
            modelOut = pickle.loads(modelBytes)

            return modelOut, True
        else:
            logging.error("chunk key: {} not found".format(key))
            return modelLocal, False

    def Push(self, modelLocal, key="0x0"):
        """
        Push the local model to the chain.

        Parameters
        ----------
        modelLocal : numpy array
            A unidimensional numpy array representing the local estimate
        key: string
            Used as key for Mapper
        wait4Tx (optional): list
            contains the wait4Tx bool as well as wait4TxRecieptTime
            If empty, the config default is taken


        Returns
        -------
        successChunksList:
            List with one element, either 0/1 depending on whether the push was successful or not
        """

        # check if any debug values have been sent in wait4Tx

        modelBinCompr = pickle.dumps(modelLocal, protocol=pickle.HIGHEST_PROTOCOL)
        modelBinCompr = zlib.compress(modelBinCompr, level=9)

        status, exception = self.contract.setChunk(
            key, 0, modelBinCompr, self.address
        )

        if not status:
            logging.error("fail to set chunk for key: {}".format(key))

        return [status]

Pull(modelLocal, key='0x0')

Pull the global model from chain and update the local model.

Parameters:

Name Type Description Default
modelLocal numpy array

A unidimensional numpy array representing the local estimate

required
key

Used as key for Mapper

'0x0'
calcWD bool

Boolean indicating whether to calculate weight difference with the global model

required
average bool

Boolean indicating whether to average the global model with the local model or not

required

Returns:

Name Type Description
modelOut

The updated model

numUpdatedChunks

The number of chunks which were successfully pulled from global model

Source code in comms/RedisComm.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def Pull(
    self,
    modelLocal,
    key="0x0"
):

    """
    Pull the global model from chain and update the local model.

    Parameters
    ----------
    modelLocal : numpy array
        A unidimensional numpy array representing the local estimate
    key: string
        Used as key for Mapper
    calcWD : bool
        Boolean indicating whether to calculate weight difference with the global model
    average : bool
        Boolean indicating whether to average the global model with the local model or not

    Returns
    -------
    modelOut:
        The updated model
    numUpdatedChunks:
        The number of chunks which were successfully pulled from global model

    """

    val = self.contract.checkChunkExists(key, 0)
    if val:

        modelBytes = self.contract.getChunk(key, 0)

        modelBytes = zlib.decompress(modelBytes)
        modelOut = pickle.loads(modelBytes)

        return modelOut, True
    else:
        logging.error("chunk key: {} not found".format(key))
        return modelLocal, False

Push(modelLocal, key='0x0')

Push the local model to the chain.

Parameters:

Name Type Description Default
modelLocal numpy array

A unidimensional numpy array representing the local estimate

required
key

Used as key for Mapper

'0x0'
wait4Tx

contains the wait4Tx bool as well as wait4TxRecieptTime If empty, the config default is taken

required

Returns:

Name Type Description
successChunksList

List with one element, either 0/1 depending on whether the push was successful or not

Source code in comms/RedisComm.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def Push(self, modelLocal, key="0x0"):
    """
    Push the local model to the chain.

    Parameters
    ----------
    modelLocal : numpy array
        A unidimensional numpy array representing the local estimate
    key: string
        Used as key for Mapper
    wait4Tx (optional): list
        contains the wait4Tx bool as well as wait4TxRecieptTime
        If empty, the config default is taken


    Returns
    -------
    successChunksList:
        List with one element, either 0/1 depending on whether the push was successful or not
    """

    # check if any debug values have been sent in wait4Tx

    modelBinCompr = pickle.dumps(modelLocal, protocol=pickle.HIGHEST_PROTOCOL)
    modelBinCompr = zlib.compress(modelBinCompr, level=9)

    status, exception = self.contract.setChunk(
        key, 0, modelBinCompr, self.address
    )

    if not status:
        logging.error("fail to set chunk for key: {}".format(key))

    return [status]