Reference for the technical implementation of the comms.Mapper project code.

Mapper

Bases: RedisComm

Mapper Scarlet maps keys to values. Values could be anything such as model parameters, arrays etc. This Mapper inherits from multiple classes. However, at run time depending on the mode of the scarlet declaration, the classes are initialized.

Attributes:

Name Type Description
key str

A unique string used to identify edge device

Methods

  • refresh() refresh contract details

  • _registerNewKey(key) registers new key by calling the corresponding underlying contract function

  • Map(modelLocal,key) maps model parameters to a given key

  • AllGather(modelLocal) Performs an AllGather operation in which all the key-value pairs are obtained.

  • Reduce(modelLocal,op) Performs a Reduce operation which comprises of an AllGather followed by an operation on all the values obtained thus far.

  • resetAll(modelLocal) Resets all the key-value pairs

Source code in comms/Mapper.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class Mapper(RedisComm):
    """
    Mapper Scarlet maps keys to values. Values could be anything such as model parameters, arrays etc.
    This Mapper inherits from multiple classes. However, at run time depending on the mode of the scarlet declaration,
    the classes are initialized.

    Attributes
    ----------

    key : str
        A unique string used to identify edge device

    Methods
    -------

    * `refresh()`
        refresh contract details

    * `_registerNewKey(key)`
        registers new key by calling the corresponding underlying contract function

    * `Map(modelLocal,key)`
        maps model parameters to a given key

    * `AllGather(modelLocal)`
        Performs an AllGather operation in which all the key-value pairs are obtained.

    * `Reduce(modelLocal,op)`
        Performs a Reduce operation which comprises of an AllGather followed by an operation on all the values
        obtained thus far.

    * `resetAll(modelLocal)`
        Resets all the key-value pairs
    """

    def __init__(self, scarletName):

        RedisComm.__init__(self,scarletName)
        self.key = str(self.address)

        self.refresh()

        self.SUM = operator.add
        self.MUL = operator.mul
        self.MAX = np.maximum
        self.MIN = np.minimum
        self.opArray = [self.SUM, self.MUL, self.MAX, self.MIN]

    def performOperation(self, modelLocal, globalModel, operation):
        """
        performs the given operation
        Parameters
        ----------
        modelLocal : numpy array
            Argument 1 of the operation
        globalModel : numpy array
            Argument 2 of the operation
        Returns
        ----------
        retval : return value is None if the operation is not supported, else it returns the result
        """
        if operation not in self.opArray:
            return None
        else:
            return operation(modelLocal, globalModel)


    def refresh(self):
        """refresh contract details"""
        self.loadContract()

        self.key = str(self.address)

        if not self._registerNewKey(self.key):
            logging.critical("Key: {} already being used".format(self.key))

    def _registerNewKey(self, key):
        """registers new key by calling the corresponding underlying contract function

        Attributes
        ----------
        key : string
            value of key to be registered

        """
        keyRegisterSuccess = self.contract.registerNewKey(key)
        return keyRegisterSuccess

    def Map(self, modelLocal, key):

        """maps model parameters to a given key

        Attributes
        ----------
        modelLocal : numpy array
            the content concerning modelLocal

        key : string
            value of key to be registered

        Returns
        -------

        successChunksList : list
            concerns the chunks which were successfully mapped

        status : boolean
            status of the Map operation

        exception : Exception
            exception if any else it will be None

        """

        successChunksList = []
        try:

            self.refresh()
            successChunksList = self.Push(modelLocal, key)
        except Exception as exception:
            logging.error("{}.Map failed", self.scarletName)
            return successChunksList, False, exception
        return successChunksList, True, None

    def AllGather(self, modelLocal):
        """Performs an AllGather operation in which all the key-value pairs are obtained from the decentralized
        infrastructure.

        Attributes
        ----------
        modelLocal : numpy array
            the content concerning modelLocal

        Returns
        -------

        allgather_dict : dict
            the dictionary containing all key value pairs

        status : boolean
            status of the Map operation

        exception : Exception
            exception if any else it will be None

        """
        allgather_dict = {}
        try:

            self.refresh()

            mapperLength = self.contract.getKeysLength()
            for key_index in range(int(mapperLength)):
                key = self.contract.getKey(key_index)
                modelOut, status = self.Pull(modelLocal, key)
                if not status:
                    logging.error(
                        "{}.AllGather.Pull failed wfor key :{}".format(
                            self.scarletName, key
                        )
                    )
                allgather_dict[key] = modelOut

            return allgather_dict, True, None

        except Exception as exception:
            logging.error(
                "{}.AllGather failed with exception {}".format(
                    self.scarletName, exception
                )
            )
            return allgather_dict, False, exception

    def Reduce(self, modelLocal, op):
        """Performs a Reduce operation which comprises of an AllGather followed by an operation on all the values
        obtained thus far. The choice of operations is SUM,MAX,MIN,MULT. In case of MAX,MIN and MULT it will be an
        element wise operation.

        Attributes
        ----------
        modelLocal : numpy array
            the content concerning modelLocal

        op : operation
            any one of the 4 operations SUM,MAX,MIN,MULT

        Returns
        -------

        sumV : numpy array
            final value after carrying out the operation sequentially on all values.

        status : boolean
            status of the Map operation

        exception : Exception
            exception if any else it will be None

        """
        sumV = modelLocal
        allgather_dict, status, exception = self.AllGather(modelLocal)
        if status:
            for key in allgather_dict.keys():
                sumV = self.performOperation(allgather_dict[key], sumV, op)
            return sumV, status, None
        else:
            return sumV, status, exception

    def resetAll(self, modelLocal):
        """Resets all the key-value pairs

        Attributes
        ----------
        modelLocal : numpy array
            the content concerning modelLocal

        Returns
        -------

        successChunksList : list
            concerns the chunks which were successfully reset

        exception : Exception
            exception if any else it will be None

        """
        successChunksList = []
        try:
            self.refresh()
            mapperLength = self.contract.getKeysLength()
            for key_index in range(int(mapperLength)):
                key = self.contract.getKey(key_index)
                successChunksList = self.Push(modelLocal, key)
        except Exception as exception:
            logging.error("{}.resetAll failed", self.scarletName)
            return successChunksList, exception
        return successChunksList, None

AllGather(modelLocal)

Performs an AllGather operation in which all the key-value pairs are obtained from the decentralized infrastructure.

Attributes:

Name Type Description
modelLocal numpy array

the content concerning modelLocal

Returns:

Name Type Description
allgather_dict dict

the dictionary containing all key value pairs

status : boolean status of the Map operation

exception : Exception exception if any else it will be None

Source code in comms/Mapper.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def AllGather(self, modelLocal):
    """Performs an AllGather operation in which all the key-value pairs are obtained from the decentralized
    infrastructure.

    Attributes
    ----------
    modelLocal : numpy array
        the content concerning modelLocal

    Returns
    -------

    allgather_dict : dict
        the dictionary containing all key value pairs

    status : boolean
        status of the Map operation

    exception : Exception
        exception if any else it will be None

    """
    allgather_dict = {}
    try:

        self.refresh()

        mapperLength = self.contract.getKeysLength()
        for key_index in range(int(mapperLength)):
            key = self.contract.getKey(key_index)
            modelOut, status = self.Pull(modelLocal, key)
            if not status:
                logging.error(
                    "{}.AllGather.Pull failed wfor key :{}".format(
                        self.scarletName, key
                    )
                )
            allgather_dict[key] = modelOut

        return allgather_dict, True, None

    except Exception as exception:
        logging.error(
            "{}.AllGather failed with exception {}".format(
                self.scarletName, exception
            )
        )
        return allgather_dict, False, exception

Map(modelLocal, key)

maps model parameters to a given key

Attributes:

Name Type Description
modelLocal numpy array

the content concerning modelLocal

key : string value of key to be registered

Returns:

Name Type Description
successChunksList list

concerns the chunks which were successfully mapped

status : boolean status of the Map operation

exception : Exception exception if any else it will be None

Source code in comms/Mapper.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def Map(self, modelLocal, key):

    """maps model parameters to a given key

    Attributes
    ----------
    modelLocal : numpy array
        the content concerning modelLocal

    key : string
        value of key to be registered

    Returns
    -------

    successChunksList : list
        concerns the chunks which were successfully mapped

    status : boolean
        status of the Map operation

    exception : Exception
        exception if any else it will be None

    """

    successChunksList = []
    try:

        self.refresh()
        successChunksList = self.Push(modelLocal, key)
    except Exception as exception:
        logging.error("{}.Map failed", self.scarletName)
        return successChunksList, False, exception
    return successChunksList, True, None

Reduce(modelLocal, op)

Performs a Reduce operation which comprises of an AllGather followed by an operation on all the values obtained thus far. The choice of operations is SUM,MAX,MIN,MULT. In case of MAX,MIN and MULT it will be an element wise operation.

Attributes:

Name Type Description
modelLocal numpy array

the content concerning modelLocal

op : operation any one of the 4 operations SUM,MAX,MIN,MULT

Returns:

Name Type Description
sumV numpy array

final value after carrying out the operation sequentially on all values.

status : boolean status of the Map operation

exception : Exception exception if any else it will be None

Source code in comms/Mapper.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def Reduce(self, modelLocal, op):
    """Performs a Reduce operation which comprises of an AllGather followed by an operation on all the values
    obtained thus far. The choice of operations is SUM,MAX,MIN,MULT. In case of MAX,MIN and MULT it will be an
    element wise operation.

    Attributes
    ----------
    modelLocal : numpy array
        the content concerning modelLocal

    op : operation
        any one of the 4 operations SUM,MAX,MIN,MULT

    Returns
    -------

    sumV : numpy array
        final value after carrying out the operation sequentially on all values.

    status : boolean
        status of the Map operation

    exception : Exception
        exception if any else it will be None

    """
    sumV = modelLocal
    allgather_dict, status, exception = self.AllGather(modelLocal)
    if status:
        for key in allgather_dict.keys():
            sumV = self.performOperation(allgather_dict[key], sumV, op)
        return sumV, status, None
    else:
        return sumV, status, exception

performOperation(modelLocal, globalModel, operation)

performs the given operation

Parameters:

Name Type Description Default
modelLocal numpy array

Argument 1 of the operation

required
globalModel numpy array

Argument 2 of the operation

required
Returns required
retval return value is None if the operation is not supported, else it returns the result required
Source code in comms/Mapper.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def performOperation(self, modelLocal, globalModel, operation):
    """
    performs the given operation
    Parameters
    ----------
    modelLocal : numpy array
        Argument 1 of the operation
    globalModel : numpy array
        Argument 2 of the operation
    Returns
    ----------
    retval : return value is None if the operation is not supported, else it returns the result
    """
    if operation not in self.opArray:
        return None
    else:
        return operation(modelLocal, globalModel)

refresh()

refresh contract details

Source code in comms/Mapper.py
80
81
82
83
84
85
86
87
def refresh(self):
    """refresh contract details"""
    self.loadContract()

    self.key = str(self.address)

    if not self._registerNewKey(self.key):
        logging.critical("Key: {} already being used".format(self.key))

resetAll(modelLocal)

Resets all the key-value pairs

Attributes:

Name Type Description
modelLocal numpy array

the content concerning modelLocal

Returns:

Name Type Description
successChunksList list

concerns the chunks which were successfully reset

exception : Exception exception if any else it will be None

Source code in comms/Mapper.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def resetAll(self, modelLocal):
    """Resets all the key-value pairs

    Attributes
    ----------
    modelLocal : numpy array
        the content concerning modelLocal

    Returns
    -------

    successChunksList : list
        concerns the chunks which were successfully reset

    exception : Exception
        exception if any else it will be None

    """
    successChunksList = []
    try:
        self.refresh()
        mapperLength = self.contract.getKeysLength()
        for key_index in range(int(mapperLength)):
            key = self.contract.getKey(key_index)
            successChunksList = self.Push(modelLocal, key)
    except Exception as exception:
        logging.error("{}.resetAll failed", self.scarletName)
        return successChunksList, exception
    return successChunksList, None