Redis class used to communicate local insights to Redis running on manager node
Attributes:
| Name |
Type |
Description |
scarletName |
string
|
name of communication primitive |
address : string
a unique address for identifying this node
identity_config : dict
dictionary current node's identity read through the identity file.
contract : Contract
the contract object that handles communication with the SmartContracts
Methods
-
_verifyScarletParameters()
Verifies whether scarlet parameters match
-
loadContract()
Loads contract details from remote DB
-
Pull(modelLocal, key="0x0", calcWD=False, average=False)
Pull the global model from chain and update the local model
-
Push( modelLocal, key="0x0", wait4Tx=None)
Push the local model to the chain.
Source code in comms/RedisComm.py
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 | class RedisComm:
"""
Redis class used to communicate local insights to Redis running on manager node
Attributes
----------
scarletName : string
name of communication primitive
address : string
a unique address for identifying this node
identity_config : dict
dictionary current node's identity read through the identity file.
contract : Contract
the contract object that handles communication with the SmartContracts
Methods
-------
* `_verifyScarletParameters()`
Verifies whether scarlet parameters match
* `loadContract()`
Loads contract details from remote DB
* `Pull(modelLocal, key="0x0", calcWD=False, average=False)`
Pull the global model from chain and update the local model
* `Push( modelLocal, key="0x0", wait4Tx=None)`
Push the local model to the chain.
"""
def __init__(self, scarletName):
self.scarletName = scarletName
self.contract = None
self.address = ""
self.identity_config = self.readIdentityFile()
self.address = self.identity_config["address"]
def readIdentityFile(self):
id_file = "/tmp/identity.yaml"
if "ID_FILE" in os.environ.keys():
id_file = os.environ["ID_FILE"]
stream = open(id_file, "r")
try:
identity_config = yaml.safe_load(stream)
return identity_config
except yaml.YAMLError as exception:
click.echo(
click.style(
"failure opening identity file : {} with exception: {}".format(
config_file, exception
),
fg="red",
)
)
return
def loadContract(self):
# initialize contract
self.contract = RedisContract(
self.scarletName
)
def Pull(
self,
modelLocal,
key="0x0"
):
"""
Pull the global model from chain and update the local model.
Parameters
----------
modelLocal : numpy array
A unidimensional numpy array representing the local estimate
key: string
Used as key for Mapper
calcWD : bool
Boolean indicating whether to calculate weight difference with the global model
average : bool
Boolean indicating whether to average the global model with the local model or not
Returns
-------
modelOut:
The updated model
numUpdatedChunks:
The number of chunks which were successfully pulled from global model
"""
val = self.contract.checkChunkExists(key, 0)
if val:
modelBytes = self.contract.getChunk(key, 0)
modelBytes = zlib.decompress(modelBytes)
modelOut = pickle.loads(modelBytes)
return modelOut, True
else:
logging.error("chunk key: {} not found".format(key))
return modelLocal, False
def Push(self, modelLocal, key="0x0"):
"""
Push the local model to the chain.
Parameters
----------
modelLocal : numpy array
A unidimensional numpy array representing the local estimate
key: string
Used as key for Mapper
wait4Tx (optional): list
contains the wait4Tx bool as well as wait4TxRecieptTime
If empty, the config default is taken
Returns
-------
successChunksList:
List with one element, either 0/1 depending on whether the push was successful or not
"""
# check if any debug values have been sent in wait4Tx
modelBinCompr = pickle.dumps(modelLocal, protocol=pickle.HIGHEST_PROTOCOL)
modelBinCompr = zlib.compress(modelBinCompr, level=9)
status, exception = self.contract.setChunk(
key, 0, modelBinCompr, self.address
)
if not status:
logging.error("fail to set chunk for key: {}".format(key))
return [status]
|
Pull(modelLocal, key='0x0')
Pull the global model from chain and update the local model.
Parameters:
| Name |
Type |
Description |
Default |
modelLocal |
numpy array
|
A unidimensional numpy array representing the local estimate |
required
|
key |
|
Used as key for Mapper |
'0x0'
|
calcWD |
bool
|
Boolean indicating whether to calculate weight difference with the global model |
required
|
average |
bool
|
Boolean indicating whether to average the global model with the local model or not |
required
|
Returns:
| Name | Type |
Description |
modelOut |
|
The updated model |
numUpdatedChunks |
|
The number of chunks which were successfully pulled from global model |
Source code in comms/RedisComm.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | def Pull(
self,
modelLocal,
key="0x0"
):
"""
Pull the global model from chain and update the local model.
Parameters
----------
modelLocal : numpy array
A unidimensional numpy array representing the local estimate
key: string
Used as key for Mapper
calcWD : bool
Boolean indicating whether to calculate weight difference with the global model
average : bool
Boolean indicating whether to average the global model with the local model or not
Returns
-------
modelOut:
The updated model
numUpdatedChunks:
The number of chunks which were successfully pulled from global model
"""
val = self.contract.checkChunkExists(key, 0)
if val:
modelBytes = self.contract.getChunk(key, 0)
modelBytes = zlib.decompress(modelBytes)
modelOut = pickle.loads(modelBytes)
return modelOut, True
else:
logging.error("chunk key: {} not found".format(key))
return modelLocal, False
|
Push(modelLocal, key='0x0')
Push the local model to the chain.
Parameters:
| Name |
Type |
Description |
Default |
modelLocal |
numpy array
|
A unidimensional numpy array representing the local estimate |
required
|
key |
|
Used as key for Mapper |
'0x0'
|
wait4Tx |
|
contains the wait4Tx bool as well as wait4TxRecieptTime
If empty, the config default is taken |
required
|
Returns:
| Name | Type |
Description |
successChunksList |
|
List with one element, either 0/1 depending on whether the push was successful or not |
Source code in comms/RedisComm.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 | def Push(self, modelLocal, key="0x0"):
"""
Push the local model to the chain.
Parameters
----------
modelLocal : numpy array
A unidimensional numpy array representing the local estimate
key: string
Used as key for Mapper
wait4Tx (optional): list
contains the wait4Tx bool as well as wait4TxRecieptTime
If empty, the config default is taken
Returns
-------
successChunksList:
List with one element, either 0/1 depending on whether the push was successful or not
"""
# check if any debug values have been sent in wait4Tx
modelBinCompr = pickle.dumps(modelLocal, protocol=pickle.HIGHEST_PROTOCOL)
modelBinCompr = zlib.compress(modelBinCompr, level=9)
status, exception = self.contract.setChunk(
key, 0, modelBinCompr, self.address
)
if not status:
logging.error("fail to set chunk for key: {}".format(key))
return [status]
|