With the ROBLOX Open Cloud API, I have been have some issues with mostly sending post requests with Python.
Datastore
With the datastores to pass data, I am not sure how to do it. Below is a code snippet of how my current datastore is structured:
async def create_key(self, datastore: str, key: str, data, scope=None):
params = {"datastoreName": datastore, 'entryKey': key,'exclusiveCreate': 'true'}
if scope:
params["scope"] = scope
encoded = str.encode(json.dumps(data))
checksum = hashlib.md5(encoded).digest()
checksum = base64.b64encode(checksum)
headers = self._H()
headers["content-md5"] = str(checksum)
headers["Content-Type"] = 'application/json'
CS = aiohttp.ClientSession(headers=headers)
req = await CS.post(self._objects_url, params=params, data=encoded)
print(CS.headers)
await CS.close()
#print(req, "REAL REQ")
#req = await req.json()
#print(req, "rrl")
return req
I have always had the difficulty of sending data to create a key.Other ones work
Heres an example:
client.command(name="gameban", description="Game Ban a user from the game using either their username or roblox id.")
async def gameban(ctx,user:typing.Union[int,str],*,reason):
universe = await roblox_client.get_universe(2806753119)
em = Embed(title="TVL | Ban System",color=ctx.author.color)
data = DataStores2(universe.id)
bannedby = await get_user(ctx.author.id)
bannedby = await roblox_client.get_user(bannedby)
bannedby = f"{bannedby.name} ({bannedby.id})"
datastore = "banData"
if type(user) == int:
user = await roblox_client.get_user(user)
else:
user = await roblox_client.get_user_by_username(user)
# Check if banned
check = await data.get(datastore, str(user.id))
#print(check, "|check")
if 'error' in check.keys() and check['error'] =="NOT_FOUND":
ban_data = {"reason":reason,"BannedBy":f"{bannedby}", "PlayerName": f"{user.name}"}
req = await data.create_key(datastore, str(user.id), ban_data)
print(req, "pp")
em.description = "A user has been banned"
em.add_field(name="Reason",value=reason,inline=False)
em.add_field(name="Banned by", value=f"{ctx.author.mention}/{bannedby}")
return await ctx.reply(embed=em)
em.description = f"{user.name} is already banned"
await ctx.reply(embed=em)
However, this do not work because it also throws an error.
MessagingService
Another good one I use mainly is the MessagingService API. I can use it very well however, the way I usually use it is a bit weird as in my games, its regex that I use and to get what I need. Below is a code snipper:
async def publish(self,topic,message):
"""
Publih a message to live game servers.\n
:topic: str, the topic that the MessagingService ingame was subscribed too\n
:message: dict, the message to send
"""
headers = self._H()
headers["Content-Type"] = 'application/json'
#print(message)
if type(message) == dict:
data = message
else:
data = {'message': message}
async with aiohttp.ClientSession() as session:
req = await session.post(f"{self._base_url}/topics/{topic}",json=data,headers=headers)
return req
This are how my requests are sent:
@client.command(name="gameannounce",aliases=['gann','ann'],description="Makes a game announcement")
async def g_announce(ctx,*,message):
data = await isVerified(ctx.author.id)
if not data:
return await ctx.reply("**:exclamation: You are not verified, run `!cmiverify` to be verified**")
MsgService = MessagingService(2470854020)
log_channel = client.get_channel(1001962262317244526)
announcer = await get_user(ctx.author.id)
announcer = await roblox_client.get_user(announcer)
group = roblox_client.get_base_group(5809933)
member = group.get_member(announcer.id)
# Check if HD+
rank = await checkRank(group,member,0)
if rank <160:
return await ctx.reply("**:exclamation: You do not permission to use this command**")
print(message)
announcement = f"<announcer:{announcer.name}>{message}"
req = await MsgService.publish("specialannouncements",{'message':announcement})
print(req)
print(req.status)
if req.status == 200:
em = Embed(title="Game Announcement",color=ctx.author.color)
em.add_field(name="Announced by:",value=ctx.author.mention,inline=False)
em.add_field(name="Announcement:",value=message)
await ctx.reply("Announced!",embed=em)
await log_channel.send(embed=em)
else:
return await ctx.reply("**:exclamation: An error has occured and could not be announced at this time**")
To send it like this is a pain, the message is formatted in f"announcement:xKen_t:Welcome to ROBLOX!". Is there a way to send it using a dict like this: announcement = {'message':{'announcer':'xKen_t','message':"Welcome to ROBLOX!"}
Could anyone assist me with this?
